In previous post, We have discussed about reading the data from CSV files where all files have same delimiter.
In this post, we will discuss about how to read data from multiple CSV files and every file is separated with different delimiter and also have the same common reader and writer. All job responsibility to read data from CSV files and write into mongoDB.
Project structure
This is a directory structure of the standard gradle project.
Project dependencies
task wrapper(type: Wrapper) { gradleVersion = '3.2.1' } apply plugin: 'java' apply plugin: 'eclipse' apply plugin: 'org.springframework.boot' sourceCompatibility = 1.8 repositories { mavenLocal() mavenCentral() } dependencies { compile 'org.springframework.data:spring-data-mongodb:1.9.8.RELEASE' compileOnly('org.projectlombok:lombok:1.16.12') compile('org.springframework.boot:spring-boot-starter-batch:1.5.2.RELEASE') testCompile('org.springframework.boot:spring-boot-starter-test:1.5.2.RELEASE') } buildscript { repositories { mavenLocal() jcenter() } dependencies { classpath "org.springframework.boot:spring-boot-gradle-plugin:1.5.2.RELEASE" } }
application.properties file
spring.data.mongodb.host=127.0.0.1 spring.data.mongodb.port=27017 spring.data.mongodb.database=springbatch
Spring Batch Configuration
Here, BatchConfiguration class will configure the FlatFileItemReader
for common reader and
MongoItemWriter
for common writer. This class will update the file name and delimiter according to
the step is going to execute. We have annotated genericReader method with @StepScope so on every
step value of file name and delimiter will different.
package com.walking.techie.csvtomongo.config; import com.walking.techie.csvtomongo.model.Domain; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.item.data.MongoItemWriter; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.data.mongodb.core.MongoTemplate; @Slf4j @Configuration @EnableBatchProcessing public class BatchConfiguration { @Autowired private MongoTemplate mongoTemplate; @Bean(destroyMethod = "") @StepScope public FlatFileItemReader<Domain> genericReader( @Value("#{stepExecutionContext['fileName']}") String fileName, @Value("#{stepExecutionContext['delimiter']}") String delimiter) { log.info("fileName:{}", fileName); log.info("delimiter:{}", delimiter); FlatFileItemReader<Domain> reader = new FlatFileItemReader<Domain>(); reader.setResource(new ClassPathResource(fileName)); reader.setLineMapper(new DefaultLineMapper<Domain>() {{ setLineTokenizer(new DelimitedLineTokenizer() {{ setNames(new String[]{"id", "name"}); setDelimiter(delimiter); }}); setFieldSetMapper(new BeanWrapperFieldSetMapper<Domain>() {{ setTargetType(Domain.class); }}); }}); return reader; } @Bean public MongoItemWriter<Domain> writer() { MongoItemWriter<Domain> writer = new MongoItemWriter<Domain>(); writer.setTemplate(mongoTemplate); writer.setCollection("domain"); return writer; } }
Custom Step Listener
We have created two custom step listener, one CommaSeparatorStepListener for one job CommaSeparatedFileReaderJob
and other PipeSeparatorStepListener for job PipeSeparatedFileReaderJob.
This step listener will set the value of filename and delimiter in StepExecution
context.
package com.walking.techie.csvtomongo.listener; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.StepExecutionListener; import org.springframework.stereotype.Component; @Slf4j @Component public class CommaSeparatorStepListener implements StepExecutionListener { @Override public void beforeStep(StepExecution stepExecution) { log.info("comma step listener called"); stepExecution.getExecutionContext().putString("fileName", "comma-separated.csv"); stepExecution.getExecutionContext().putString("delimiter", ","); } @Override public ExitStatus afterStep(StepExecution stepExecution) { return null; } }
package com.walking.techie.csvtomongo.listener; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.StepExecutionListener; import org.springframework.stereotype.Component; @Slf4j @Component public class PipeSeparatorStepListener implements StepExecutionListener { @Override public void beforeStep(StepExecution stepExecution) { log.info("pipe step listener called"); stepExecution.getExecutionContext().putString("fileName", "pipe-separated.csv"); stepExecution.getExecutionContext().putString("delimiter", "|"); } @Override public ExitStatus afterStep(StepExecution stepExecution) { return null; } }
CSV files
A comma separated CSV file.1,walkingtechie.blogspot.com 2,google.com 3,facebook.com 4,twitter.comA pipe separated CSV file.
100|walkingtechie.blogspot.com 200|amazon.com 300|flipkart.com 400|google.com 500|github.com
Spring Batch Jobs
Create two jobs which will read CSV files and write into mongo database. One job will read data from a CSV file which is comma separated and other job will read the data from pipe separated CSV file. Both job will use the common reader to read data from CSV file and writer to write data into MongoDB.
package com.walking.techie.csvtomongo.jobs; import com.walking.techie.csvtomongo.listener.CommaSeparatorStepListener; import com.walking.techie.csvtomongo.model.Domain; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.item.data.MongoItemWriter; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class CommaSeparatedFileReaderJob { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired private FlatFileItemReader<Domain> commaSeparatorReader; @Autowired private MongoItemWriter<Domain> mongoItemWriter; @Bean public Job commaDelimiterJob() { return jobBuilderFactory.get("commaDelimiterJob").incrementer(new RunIdIncrementer()) .start(commaSeparatedStep()).build(); } @Bean public Step commaSeparatedStep() { return stepBuilderFactory.get("commaSeparatedStep").<Domain, Domain>chunk(10) .reader(commaSeparatorReader).writer(mongoItemWriter).listener(commaStepListener()).build(); } @Bean public CommaSeparatorStepListener commaStepListener() { return new CommaSeparatorStepListener(); } }
package com.walking.techie.csvtomongo.jobs; import com.walking.techie.csvtomongo.listener.PipeSeparatorStepListener; import com.walking.techie.csvtomongo.model.Domain; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.item.data.MongoItemWriter; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class PipeSeparatedFileReaderJob { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired private FlatFileItemReader<Domain> pipeSeparatorReader; @Autowired private MongoItemWriter<Domain> mongoItemWriter; @Bean public Job pipeDelimiterJob() { return jobBuilderFactory.get("pipeDelimiterJob").incrementer(new RunIdIncrementer()) .start(pipeSeparatedStep()) .build(); } @Bean public Step pipeSeparatedStep() { return stepBuilderFactory.get("step1").<Domain, Domain>chunk(10).reader(pipeSeparatorReader) .writer(mongoItemWriter).listener(pipeStepListener()).build(); } @Bean public PipeSeparatorStepListener pipeStepListener() { return new PipeSeparatorStepListener(); } }
Map CSV file values to Domain
object and write to mongodb.
A Java model class
package com.walking.techie.csvtomongo.model; import lombok.Data; @Data public class Domain { private int id; private String name; }
Run Application
package com.walking.techie; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; @SpringBootApplication(exclude = DataSourceAutoConfiguration.class) public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
Output
This application will read data from comma-separated.csv and pipe-separated files using the common reader and write records in domain collection. you can verify the records in mongodb.
output in console
2017-07-10 23:05:07.688 INFO 6750 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=commaDelimiterJob]] launched with the following parameters: [{run.id=1}] 2017-07-10 23:05:07.711 INFO 6750 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [commaSeparatedStep] 2017-07-10 23:05:07.715 INFO 6750 --- [ main] c.w.t.c.l.CommaSeparatorStepListener : comma step listener called 2017-07-10 23:05:07.738 INFO 6750 --- [ main] c.w.t.c.config.BatchConfiguration : fileName:comma-separated.csv 2017-07-10 23:05:07.739 INFO 6750 --- [ main] c.w.t.c.config.BatchConfiguration : delimiter:, 2017-07-10 23:05:07.813 INFO 6750 --- [ main] org.mongodb.driver.connection : Opened connection [connectionId{localValue:2, serverValue:54}] to 127.0.0.1:27017 2017-07-10 23:05:07.847 INFO 6750 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=commaDelimiterJob]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED] 2017-07-10 23:05:07.850 INFO 6750 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=pipeDelimiterJob]] launched with the following parameters: [{run.id=1}] 2017-07-10 23:05:07.858 INFO 6750 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step1] 2017-07-10 23:05:07.860 INFO 6750 --- [ main] c.w.t.c.l.PipeSeparatorStepListener : pipe step listener called 2017-07-10 23:05:07.861 INFO 6750 --- [ main] c.w.t.c.config.BatchConfiguration : fileName:pipe-separated.csv 2017-07-10 23:05:07.862 INFO 6750 --- [ main] c.w.t.c.config.BatchConfiguration : delimiter:| 2017-07-10 23:05:07.877 INFO 6750 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=pipeDelimiterJob]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED]
Note: This code has been compiled and run on mac notebook and intellij IDEA.
Good article..
ReplyDelete