Spring Batch Example in Spring boot - CSV Files with different delimiter to Mongo Database - Walking Techie

Blog about Java programming, Design Pattern, and Data Structure.

Tuesday, July 11, 2017

Spring Batch Example in Spring boot - CSV Files with different delimiter to Mongo Database

In previous post, We have discussed about reading the data from CSV files where all files have same delimiter.

In this post, we will discuss about how to read data from multiple CSV files and every file is separated with different delimiter and also have the same common reader and writer. All job responsibility to read data from CSV files and write into mongoDB.

Project structure

This is a directory structure of the standard gradle project.

spring boot CSV Files with different delimiter to Mongo Database example project structure

Project dependencies

task wrapper(type: Wrapper) {
    gradleVersion = '3.2.1'
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'

sourceCompatibility = 1.8

repositories {
    mavenLocal()
    mavenCentral()
}


dependencies {
    compile 'org.springframework.data:spring-data-mongodb:1.9.8.RELEASE'
    compileOnly('org.projectlombok:lombok:1.16.12')
    compile('org.springframework.boot:spring-boot-starter-batch:1.5.2.RELEASE')
    testCompile('org.springframework.boot:spring-boot-starter-test:1.5.2.RELEASE')
}
buildscript {
    repositories {
        mavenLocal()
        jcenter()
    }
    dependencies {
        classpath "org.springframework.boot:spring-boot-gradle-plugin:1.5.2.RELEASE"
    }
}

application.properties file

spring.data.mongodb.host=127.0.0.1
spring.data.mongodb.port=27017
spring.data.mongodb.database=springbatch

Spring Batch Configuration

Here, BatchConfiguration class will configure the FlatFileItemReader for common reader and MongoItemWriter for common writer. This class will update the file name and delimiter according to the step is going to execute. We have annotated genericReader method with @StepScope so on every step value of file name and delimiter will different.

package com.walking.techie.csvtomongo.config;

import com.walking.techie.csvtomongo.model.Domain;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.data.MongoItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.mongodb.core.MongoTemplate;

@Slf4j
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

  @Autowired
  private MongoTemplate mongoTemplate;

  @Bean(destroyMethod = "")
  @StepScope
  public FlatFileItemReader<Domain> genericReader(
      @Value("#{stepExecutionContext['fileName']}") String fileName,
      @Value("#{stepExecutionContext['delimiter']}") String delimiter) {
    log.info("fileName:{}", fileName);
    log.info("delimiter:{}", delimiter);
    FlatFileItemReader<Domain> reader = new FlatFileItemReader<Domain>();
    reader.setResource(new ClassPathResource(fileName));
    reader.setLineMapper(new DefaultLineMapper<Domain>() {{
      setLineTokenizer(new DelimitedLineTokenizer() {{
        setNames(new String[]{"id", "name"});
        setDelimiter(delimiter);
      }});
      setFieldSetMapper(new BeanWrapperFieldSetMapper<Domain>() {{
        setTargetType(Domain.class);
      }});
    }});
    return reader;
  }


  @Bean
  public MongoItemWriter<Domain> writer() {
    MongoItemWriter<Domain> writer = new MongoItemWriter<Domain>();
    writer.setTemplate(mongoTemplate);
    writer.setCollection("domain");
    return writer;
  }
}

Custom Step Listener

We have created two custom step listener, one CommaSeparatorStepListener for one job CommaSeparatedFileReaderJob and other PipeSeparatorStepListener for job PipeSeparatedFileReaderJob. This step listener will set the value of filename and delimiter in StepExecution context.

package com.walking.techie.csvtomongo.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class CommaSeparatorStepListener implements StepExecutionListener {

  @Override
  public void beforeStep(StepExecution stepExecution) {
    log.info("comma step listener called");
    stepExecution.getExecutionContext().putString("fileName", "comma-separated.csv");
    stepExecution.getExecutionContext().putString("delimiter", ",");
  }

  @Override
  public ExitStatus afterStep(StepExecution stepExecution) {
    return null;
  }
}
package com.walking.techie.csvtomongo.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class PipeSeparatorStepListener implements StepExecutionListener {

  @Override
  public void beforeStep(StepExecution stepExecution) {
    log.info("pipe step listener called");
    stepExecution.getExecutionContext().putString("fileName", "pipe-separated.csv");
    stepExecution.getExecutionContext().putString("delimiter", "|");
  }

  @Override
  public ExitStatus afterStep(StepExecution stepExecution) {
    return null;
  }
}

CSV files

A comma separated CSV file.
1,walkingtechie.blogspot.com
2,google.com
3,facebook.com
4,twitter.com
A pipe separated CSV file.
100|walkingtechie.blogspot.com
200|amazon.com
300|flipkart.com
400|google.com
500|github.com

Spring Batch Jobs

Create two jobs which will read CSV files and write into mongo database. One job will read data from a CSV file which is comma separated and other job will read the data from pipe separated CSV file. Both job will use the common reader to read data from CSV file and writer to write data into MongoDB.

package com.walking.techie.csvtomongo.jobs;

import com.walking.techie.csvtomongo.listener.CommaSeparatorStepListener;
import com.walking.techie.csvtomongo.model.Domain;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.data.MongoItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class CommaSeparatedFileReaderJob {

  @Autowired
  private JobBuilderFactory jobBuilderFactory;
  @Autowired
  private StepBuilderFactory stepBuilderFactory;

  @Autowired
  private FlatFileItemReader<Domain> commaSeparatorReader;

  @Autowired
  private MongoItemWriter<Domain> mongoItemWriter;

  @Bean
  public Job commaDelimiterJob() {
    return jobBuilderFactory.get("commaDelimiterJob").incrementer(new RunIdIncrementer())
        .start(commaSeparatedStep()).build();
  }

  @Bean
  public Step commaSeparatedStep() {
    return stepBuilderFactory.get("commaSeparatedStep").<Domain, Domain>chunk(10)
        .reader(commaSeparatorReader).writer(mongoItemWriter).listener(commaStepListener()).build();
  }

  @Bean
  public CommaSeparatorStepListener commaStepListener() {
    return new CommaSeparatorStepListener();
  }
}
package com.walking.techie.csvtomongo.jobs;

import com.walking.techie.csvtomongo.listener.PipeSeparatorStepListener;
import com.walking.techie.csvtomongo.model.Domain;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.data.MongoItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class PipeSeparatedFileReaderJob {

  @Autowired
  private JobBuilderFactory jobBuilderFactory;
  @Autowired
  private StepBuilderFactory stepBuilderFactory;

  @Autowired
  private FlatFileItemReader<Domain> pipeSeparatorReader;

  @Autowired
  private MongoItemWriter<Domain> mongoItemWriter;

  @Bean
  public Job pipeDelimiterJob() {
    return jobBuilderFactory.get("pipeDelimiterJob").incrementer(new RunIdIncrementer())
        .start(pipeSeparatedStep())
        .build();
  }

  @Bean
  public Step pipeSeparatedStep() {
    return stepBuilderFactory.get("step1").<Domain, Domain>chunk(10).reader(pipeSeparatorReader)
        .writer(mongoItemWriter).listener(pipeStepListener()).build();
  }

  @Bean
  public PipeSeparatorStepListener pipeStepListener() {
    return new PipeSeparatorStepListener();
  }
}

Map CSV file values to Domain object and write to mongodb.

A Java model class

package com.walking.techie.csvtomongo.model;

import lombok.Data;

@Data
public class Domain {

  private int id;
  private String name;
}

Run Application

package com.walking.techie;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;

@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class Application {

  public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
  }
}

Output

This application will read data from comma-separated.csv and pipe-separated files using the common reader and write records in domain collection. you can verify the records in mongodb.

output in console

2017-07-10 23:05:07.688  INFO 6750 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=commaDelimiterJob]] launched with the following parameters: [{run.id=1}]
2017-07-10 23:05:07.711  INFO 6750 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [commaSeparatedStep]
2017-07-10 23:05:07.715  INFO 6750 --- [           main] c.w.t.c.l.CommaSeparatorStepListener     : comma step listener called
2017-07-10 23:05:07.738  INFO 6750 --- [           main] c.w.t.c.config.BatchConfiguration        : fileName:comma-separated.csv
2017-07-10 23:05:07.739  INFO 6750 --- [           main] c.w.t.c.config.BatchConfiguration        : delimiter:,
2017-07-10 23:05:07.813  INFO 6750 --- [           main] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:2, serverValue:54}] to 127.0.0.1:27017
2017-07-10 23:05:07.847  INFO 6750 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=commaDelimiterJob]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED]
2017-07-10 23:05:07.850  INFO 6750 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=pipeDelimiterJob]] launched with the following parameters: [{run.id=1}]
2017-07-10 23:05:07.858  INFO 6750 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step1]
2017-07-10 23:05:07.860  INFO 6750 --- [           main] c.w.t.c.l.PipeSeparatorStepListener      : pipe step listener called
2017-07-10 23:05:07.861  INFO 6750 --- [           main] c.w.t.c.config.BatchConfiguration        : fileName:pipe-separated.csv
2017-07-10 23:05:07.862  INFO 6750 --- [           main] c.w.t.c.config.BatchConfiguration        : delimiter:|
2017-07-10 23:05:07.877  INFO 6750 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=pipeDelimiterJob]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED]

Note: This code has been compiled and run on mac notebook and intellij IDEA.

1 comment :