In this post, we will show you how to configure a Spring Batch job to read data from a CSV file and write into mysql database.
Project structure
This is a directory structure of the standard gradle project.
Project dependencies
task wrapper(type: Wrapper) {
gradleVersion = '3.2.1'
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
sourceCompatibility = 1.8
repositories {
mavenLocal()
mavenCentral()
}
dependencies {
compile 'org.springframework:spring-oxm:4.3.7.RELEASE'
compileOnly('org.projectlombok:lombok:1.16.12')
runtime('mysql:mysql-connector-java')
compile('org.springframework.boot:spring-boot-starter-batch:1.5.2.RELEASE')
testCompile('org.springframework.boot:spring-boot-starter-test:1.5.2.RELEASE')
}
buildscript {
repositories {
mavenLocal()
jcenter()
}
dependencies {
classpath "org.springframework.boot:spring-boot-gradle-plugin:1.5.2.RELEASE"
}
}
application.properties file
spring.datasource.url=jdbc:mysql://localhost:3306/spring_batch spring.datasource.username=root spring.datasource.password=santosh spring.jpa.hibernate.ddl-auto=update
Spring Batch Jobs
CSV file
Walking,Techie Sachin,Tendulkar Justin,Doe Jane,Doe John,Doe
write a SQL script to create a table to store the data.
DROP TABLE IF EXISTS people;
CREATE TABLE people (
person_id BIGINT AUTO_INCREMENT NOT NULL PRIMARY KEY,
first_name VARCHAR(20),
last_name VARCHAR(20)
)ENGINE=InnoDB DEFAULT CHARSET=latin1 AUTO_INCREMENT=1;
Create a job which will read CSV file and write into mysql database.
package com.walking.techie.csvtomysql.jobs;
import com.walking.techie.csvtomysql.listener.JobCompletionNotificationListener;
import com.walking.techie.csvtomysql.model.Person;
import com.walking.techie.csvtomysql.processor.PersonItemProcessor;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
// tag::readerwriterprocessor[]
@Bean
public FlatFileItemReader<Person> reader() {
FlatFileItemReader<Person> reader = new FlatFileItemReader<>();
reader.setResource(new ClassPathResource("person.csv"));
reader.setLineMapper(new DefaultLineMapper<Person>() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
setNames(new String[]{"firstName", "lastName"});
}});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
setTargetType(Person.class);
}});
}});
return reader;
}
@Bean
public PersonItemProcessor processor() {
return new PersonItemProcessor();
}
@Bean
public JdbcBatchItemWriter<Person> writer() {
JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<>();
writer.setItemSqlParameterSourceProvider(
new BeanPropertyItemSqlParameterSourceProvider<Person>());
writer.setSql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)");
writer.setDataSource(dataSource);
return writer;
}
// end::readerwriterprocessor[]
// tag::jobstep[]
@Bean
public Job importUserJob(JobCompletionNotificationListener listener) {
return jobBuilderFactory.get("importUserJob").incrementer(new RunIdIncrementer())
.listener(listener).flow(step1()).end().build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1").<Person, Person>chunk(10).reader(reader())
.processor(processor()).writer(writer()).build();
}
// end::jobstep[]
}
Map CSV file values to Student object and write to mysql database.
A Java model class
package com.walking.techie.csvtomysql.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@AllArgsConstructor
@NoArgsConstructor
@Data
@ToString
public class Person {
private String firstName;
private String lastName;
}
A custom person processor class that will process each and every Student object and convert the
member of person into upper case.
package com.walking.techie.csvtomysql.processor;
import com.walking.techie.csvtomysql.model.Person;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;
@Slf4j
public class PersonItemProcessor implements ItemProcessor<Person, Person> {
@Override
public Person process(Person person) throws Exception {
final String firstName = person.getFirstName().toUpperCase();
final String lastName = person.getLastName().toUpperCase();
final Person transformedPerson = new Person(firstName, lastName);
log.info("Converting (" + person + ") into (" + transformedPerson + ")");
return transformedPerson;
}
}
JobExecutionListenerSupport is a listener class that listen to job before job start and job
completed.
package com.walking.techie.csvtomysql.listener;
import com.walking.techie.csvtomysql.model.Person;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {
private final JdbcTemplate jdbcTemplate;
public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info("!!! JOB FINISHED! Time to verify the results");
List<Person> results = jdbcTemplate
.query("SELECT first_name, last_name FROM people", new RowMapper<Person>() {
@Override
public Person mapRow(ResultSet rs, int row) throws SQLException {
return new Person(rs.getString(1), rs.getString(2));
}
});
for (Person person : results) {
log.info("Found <" + person + "> in the database.");
}
}
}
}
Run Application
package com.walking.techie;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
Output
This application will read data from person.csv file and write records in person table. you can verify the records from console output.
output on console
2017-03-26 19:37:24.295 INFO 6452 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=importUserJob]] launched with the following parameters: [{run.id=4}]
2017-03-26 19:37:24.313 INFO 6452 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step1]
2017-03-26 19:37:24.331 INFO 6452 --- [ main] c.w.t.c.processor.PersonItemProcessor : Converting (Person(firstName=Walking, lastName=Techie)) into (Person(firstName=WALKING, lastName=TECHIE))
2017-03-26 19:37:24.331 INFO 6452 --- [ main] c.w.t.c.processor.PersonItemProcessor : Converting (Person(firstName=Sachin, lastName=Tendulkar)) into (Person(firstName=SACHIN, lastName=TENDULKAR))
2017-03-26 19:37:24.331 INFO 6452 --- [ main] c.w.t.c.processor.PersonItemProcessor : Converting (Person(firstName=Justin, lastName=Doe)) into (Person(firstName=JUSTIN, lastName=DOE))
2017-03-26 19:37:24.331 INFO 6452 --- [ main] c.w.t.c.processor.PersonItemProcessor : Converting (Person(firstName=Jane, lastName=Doe)) into (Person(firstName=JANE, lastName=DOE))
2017-03-26 19:37:24.331 INFO 6452 --- [ main] c.w.t.c.processor.PersonItemProcessor : Converting (Person(firstName=John, lastName=Doe)) into (Person(firstName=JOHN, lastName=DOE))
2017-03-26 19:37:24.345 INFO 6452 --- [ main] .t.c.l.JobCompletionNotificationListener : !!! JOB FINISHED! Time to verify the results
2017-03-26 19:37:24.346 INFO 6452 --- [ main] .t.c.l.JobCompletionNotificationListener : Found <Person(firstName=WALKING, lastName=TECHIE)> in the database.
2017-03-26 19:37:24.346 INFO 6452 --- [ main] .t.c.l.JobCompletionNotificationListener : Found <Person(firstName=SACHIN, lastName=TENDULKAR)> in the database.
2017-03-26 19:37:24.346 INFO 6452 --- [ main] .t.c.l.JobCompletionNotificationListener : Found <Person(firstName=JUSTIN, lastName=DOE)> in the database.
2017-03-26 19:37:24.346 INFO 6452 --- [ main] .t.c.l.JobCompletionNotificationListener : Found <Person(firstName=JANE, lastName=DOE)> in the database.
2017-03-26 19:37:24.346 INFO 6452 --- [ main] .t.c.l.JobCompletionNotificationListener : Found <Person(firstName=JOHN, lastName=DOE)> in the database.
2017-03-26 19:37:24.349 INFO 6452 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=importUserJob]] completed with the following parameters: [{run.id=4}] and the following status: [COMPLETED]
Note : This code has been compiled and run on mac notebook and intellij IDEA.
Nice article. I have to implement spring batch in my existing application with gradle. Your article gave a start to me. One question is how to you run Application.java from the command prompt. Does it need any args. If at all I have to call from a ewb page link, how to I run,
ReplyDeleteYou can run from terminal using the following command.
DeleteApproach-1.
1. gradle clean build
2. gradle clean bootRun
Approach-2
1. gradle clean build
2. java -jar -Dspring.profiles.active=dev build/libs/jar[jar location]
Nice article. very helpful for a new guy learning spring batch and boot. btw- what theme are you using in intelliJ? They are real cool.
ReplyDeletefound it very helpful
ReplyDeleteIn this case u always create new table and drop old one if exist what if i want to insert data into table which is already created
ReplyDeletea very good article, well written for newcomers as well as for advanced people in databases
ReplyDeletewhen i run,it asks me to edit configurations(?) ...what do i edit ..thank you
ReplyDeleteİt is good and helpful if dependency of spring (nearly every line with beans) so deep won't be problem for you.
ReplyDelete