Spring Batch Example in Spring boot - CSV File to MySql Database - Walking Techie

Blog about Java programming, Design Pattern, and Data Structure.

Tuesday, March 28, 2017

Spring Batch Example in Spring boot - CSV File to MySql Database

In this post, we will show you how to configure a Spring Batch job to read data from a CSV file and write into mysql database.

Project structure

This is a directory structure of the standard gradle project.

Project dependencies

task wrapper(type: Wrapper) {
 gradleVersion = '3.2.1'
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'

sourceCompatibility = 1.8

repositories {
 mavenLocal()
 mavenCentral()
}


dependencies {
 compile 'org.springframework:spring-oxm:4.3.7.RELEASE'
 compileOnly('org.projectlombok:lombok:1.16.12')
 runtime('mysql:mysql-connector-java')
 compile('org.springframework.boot:spring-boot-starter-batch:1.5.2.RELEASE')
 testCompile('org.springframework.boot:spring-boot-starter-test:1.5.2.RELEASE')
}
buildscript {
 repositories {
  mavenLocal()
  jcenter()
 }
 dependencies {
  classpath "org.springframework.boot:spring-boot-gradle-plugin:1.5.2.RELEASE"
 }
}

application.properties file

spring.datasource.url=jdbc:mysql://localhost:3306/spring_batch
spring.datasource.username=root
spring.datasource.password=santosh
spring.jpa.hibernate.ddl-auto=update

Spring Batch Jobs

CSV file

Walking,Techie
Sachin,Tendulkar
Justin,Doe
Jane,Doe
John,Doe

write a SQL script to create a table to store the data.

DROP TABLE IF EXISTS people;

CREATE TABLE people  (
    person_id BIGINT AUTO_INCREMENT NOT NULL PRIMARY KEY,
    first_name VARCHAR(20),
    last_name VARCHAR(20)
)ENGINE=InnoDB DEFAULT CHARSET=latin1 AUTO_INCREMENT=1;

Create a job which will read CSV file and write into mysql database.

package com.walking.techie.csvtomysql.jobs;

import com.walking.techie.csvtomysql.listener.JobCompletionNotificationListener;
import com.walking.techie.csvtomysql.model.Person;
import com.walking.techie.csvtomysql.processor.PersonItemProcessor;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

  @Autowired
  private JobBuilderFactory jobBuilderFactory;
  @Autowired
  private StepBuilderFactory stepBuilderFactory;
  @Autowired
  private DataSource dataSource;


  // tag::readerwriterprocessor[]
  @Bean
  public FlatFileItemReader<Person> reader() {
    FlatFileItemReader<Person> reader = new FlatFileItemReader<>();
    reader.setResource(new ClassPathResource("person.csv"));
    reader.setLineMapper(new DefaultLineMapper<Person>() {{
      setLineTokenizer(new DelimitedLineTokenizer() {{
        setNames(new String[]{"firstName", "lastName"});
      }});
      setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
        setTargetType(Person.class);
      }});
    }});
    return reader;
  }

  @Bean
  public PersonItemProcessor processor() {
    return new PersonItemProcessor();
  }

  @Bean
  public JdbcBatchItemWriter<Person> writer() {
    JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<>();
    writer.setItemSqlParameterSourceProvider(
        new BeanPropertyItemSqlParameterSourceProvider<Person>());
    writer.setSql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)");
    writer.setDataSource(dataSource);
    return writer;
  }
  // end::readerwriterprocessor[]


  // tag::jobstep[]
  @Bean
  public Job importUserJob(JobCompletionNotificationListener listener) {
    return jobBuilderFactory.get("importUserJob").incrementer(new RunIdIncrementer())
        .listener(listener).flow(step1()).end().build();
  }

  @Bean
  public Step step1() {
    return stepBuilderFactory.get("step1").<Person, Person>chunk(10).reader(reader())
        .processor(processor()).writer(writer()).build();
  }
  // end::jobstep[]
}

Map CSV file values to Student object and write to mysql database.

A Java model class

package com.walking.techie.csvtomysql.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

@AllArgsConstructor
@NoArgsConstructor
@Data
@ToString
public class Person {

  private String firstName;
  private String lastName;
}

A custom person processor class that will process each and every Student object and convert the member of person into upper case.

package com.walking.techie.csvtomysql.processor;

import com.walking.techie.csvtomysql.model.Person;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;

@Slf4j
public class PersonItemProcessor implements ItemProcessor<Person, Person> {

  @Override
  public Person process(Person person) throws Exception {
    final String firstName = person.getFirstName().toUpperCase();
    final String lastName = person.getLastName().toUpperCase();
    final Person transformedPerson = new Person(firstName, lastName);
    log.info("Converting (" + person + ") into (" + transformedPerson + ")");

    return transformedPerson;
  }
}

JobExecutionListenerSupport is a listener class that listen to job before job start and job completed.

package com.walking.techie.csvtomysql.listener;


import com.walking.techie.csvtomysql.model.Person;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {

  private final JdbcTemplate jdbcTemplate;

  public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
    this.jdbcTemplate = jdbcTemplate;
  }

  @Override
  public void afterJob(JobExecution jobExecution) {
    if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
      log.info("!!! JOB FINISHED! Time to verify the results");

      List<Person> results = jdbcTemplate
          .query("SELECT first_name, last_name FROM people", new RowMapper<Person>() {
            @Override
            public Person mapRow(ResultSet rs, int row) throws SQLException {
              return new Person(rs.getString(1), rs.getString(2));
            }
          });

      for (Person person : results) {
        log.info("Found <" + person + "> in the database.");
      }

    }
  }
}

Run Application

package com.walking.techie;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

 public static void main(String[] args) {
  SpringApplication.run(Application.class, args);
 }
}

Output

This application will read data from person.csv file and write records in person table. you can verify the records from console output.

output on console

2017-03-26 19:37:24.295  INFO 6452 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=importUserJob]] launched with the following parameters: [{run.id=4}]
2017-03-26 19:37:24.313  INFO 6452 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step1]
2017-03-26 19:37:24.331  INFO 6452 --- [           main] c.w.t.c.processor.PersonItemProcessor    : Converting (Person(firstName=Walking, lastName=Techie)) into (Person(firstName=WALKING, lastName=TECHIE))
2017-03-26 19:37:24.331  INFO 6452 --- [           main] c.w.t.c.processor.PersonItemProcessor    : Converting (Person(firstName=Sachin, lastName=Tendulkar)) into (Person(firstName=SACHIN, lastName=TENDULKAR))
2017-03-26 19:37:24.331  INFO 6452 --- [           main] c.w.t.c.processor.PersonItemProcessor    : Converting (Person(firstName=Justin, lastName=Doe)) into (Person(firstName=JUSTIN, lastName=DOE))
2017-03-26 19:37:24.331  INFO 6452 --- [           main] c.w.t.c.processor.PersonItemProcessor    : Converting (Person(firstName=Jane, lastName=Doe)) into (Person(firstName=JANE, lastName=DOE))
2017-03-26 19:37:24.331  INFO 6452 --- [           main] c.w.t.c.processor.PersonItemProcessor    : Converting (Person(firstName=John, lastName=Doe)) into (Person(firstName=JOHN, lastName=DOE))
2017-03-26 19:37:24.345  INFO 6452 --- [           main] .t.c.l.JobCompletionNotificationListener : !!! JOB FINISHED! Time to verify the results
2017-03-26 19:37:24.346  INFO 6452 --- [           main] .t.c.l.JobCompletionNotificationListener : Found <Person(firstName=WALKING, lastName=TECHIE)> in the database.
2017-03-26 19:37:24.346  INFO 6452 --- [           main] .t.c.l.JobCompletionNotificationListener : Found <Person(firstName=SACHIN, lastName=TENDULKAR)> in the database.
2017-03-26 19:37:24.346  INFO 6452 --- [           main] .t.c.l.JobCompletionNotificationListener : Found <Person(firstName=JUSTIN, lastName=DOE)> in the database.
2017-03-26 19:37:24.346  INFO 6452 --- [           main] .t.c.l.JobCompletionNotificationListener : Found <Person(firstName=JANE, lastName=DOE)> in the database.
2017-03-26 19:37:24.346  INFO 6452 --- [           main] .t.c.l.JobCompletionNotificationListener : Found <Person(firstName=JOHN, lastName=DOE)> in the database.
2017-03-26 19:37:24.349  INFO 6452 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=importUserJob]] completed with the following parameters: [{run.id=4}] and the following status: [COMPLETED]

Note : This code has been compiled and run on mac notebook and intellij IDEA.

8 comments :

  1. Nice article. I have to implement spring batch in my existing application with gradle. Your article gave a start to me. One question is how to you run Application.java from the command prompt. Does it need any args. If at all I have to call from a ewb page link, how to I run,

    ReplyDelete
    Replies
    1. You can run from terminal using the following command.
      Approach-1.
      1. gradle clean build
      2. gradle clean bootRun

      Approach-2
      1. gradle clean build
      2. java -jar -Dspring.profiles.active=dev build/libs/jar[jar location]

      Delete
  2. Nice article. very helpful for a new guy learning spring batch and boot. btw- what theme are you using in intelliJ? They are real cool.

    ReplyDelete
  3. found it very helpful

    ReplyDelete
  4. In this case u always create new table and drop old one if exist what if i want to insert data into table which is already created

    ReplyDelete
  5. a very good article, well written for newcomers as well as for advanced people in databases

    ReplyDelete
  6. when i run,it asks me to edit configurations(?) ...what do i edit ..thank you

    ReplyDelete
  7. Ä°t is good and helpful if dependency of spring (nearly every line with beans) so deep won't be problem for you.

    ReplyDelete