In Spring Batch, there are seven interfaces to intercept step execution.
- ChunkListener
- ItemProcessListener
- ItemReadListener
- ItemWriteListener
- RetryListener
- SkipListener
- StepExecutionListener
ChunkListener is an interface for lifecycle of a chunk. A chunk can be through of as a collection of items that will be committed together.
This interface will listen for processing of an item. Implementation of this interface will be notified before and after item is passed to the ItemProcessor and in the event of any exceptions thrown by the processor.
Listener interface around the reading of an item.
Listener interface for the writing of items. Implementations of this interface will be notified before, after, and in case of any exception thrown while writing a list of items.
Interface used internally by RetryListener adapters to provide consistent naming. Extends StepListener to allow registration with existing listener methods.
Interface for listener to skipped items. Callbacks will be called by Step implementations at the appropriate time in the step lifecycle.
Listener interface for the lifecycle of a Step.
Project structure
This is a directory structure of the standard gradle project.
Project dependencies
task wrapper(type: Wrapper) { gradleVersion = '3.2.1' } apply plugin: 'java' apply plugin: 'eclipse' apply plugin: 'org.springframework.boot' sourceCompatibility = 1.8 repositories { mavenLocal() mavenCentral() } dependencies { compileOnly('org.projectlombok:lombok:1.16.12') compile('org.springframework.boot:spring-boot-starter-batch:1.5.2.RELEASE') testCompile('org.springframework.boot:spring-boot-starter-test:1.5.2.RELEASE') } buildscript { repositories { mavenLocal() jcenter() } dependencies { classpath "org.springframework.boot:spring-boot-gradle-plugin:1.5.2.RELEASE" } }
application.properties file
#Empty
Spring Batch Jobs
CSV files
1,facebook.com 2,yahoo.com 3,google.com
200,walkingtechie.blogspot.com 300,stackoverflow.com 400,oracle.com
999,eclipse.org 888,baidu.com 777,twitter.com
Create a job which will read from CSV files and write into CSV files. Listener in this job listen to corresponding events and notify.
package com.walking.techie.jobs; import com.walking.techie.listener.CustomChunkListener; import com.walking.techie.listener.CustomItemReaderListener; import com.walking.techie.listener.CustomItemWriterListener; import com.walking.techie.listener.CustomProcessListener; import com.walking.techie.listener.CustomStepListener; import com.walking.techie.model.Domain; import com.walking.techie.processor.CustomItemProcessor; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.FlatFileItemWriter; import org.springframework.batch.item.file.MultiResourceItemReader; import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor; import org.springframework.batch.item.file.transform.DelimitedLineAggregator; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.FileSystemResource; import org.springframework.core.io.Resource; @Slf4j @Configuration @EnableBatchProcessing public class SpringBatchListener { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Value("csv/domain*.csv") private Resource[] resources; @Bean public Job readFiles() { return jobBuilderFactory.get("readFiles").incrementer(new RunIdIncrementer()). start(step1(null, null, null)).build(); } @Bean public Step step1(CustomItemProcessor itemProcessor, CustomChunkListener chunkListener, CustomProcessListener processListener) { return stepBuilderFactory.get("step1").<Domain, Domain>chunk(5) .reader(multiResourceItemReader()).processor(itemProcessor).writer(writer()) .listener(processListener).listener(itemReaderListener()) .listener(itemWriterListener()).listener(customStepListener()).listener(chunkListener) .build(); } @Bean public CustomStepListener customStepListener() { return new CustomStepListener(); } @Bean public CustomItemWriterListener itemWriterListener() { return new CustomItemWriterListener(); } @Bean public CustomItemReaderListener itemReaderListener() { return new CustomItemReaderListener(); } @Bean public MultiResourceItemReader<Domain> multiResourceItemReader() { log.info("Multi reader called"); MultiResourceItemReader<Domain> resourceItemReader = new MultiResourceItemReader<Domain>(); resourceItemReader.setResources(resources); resourceItemReader.setDelegate(reader()); return resourceItemReader; } @Bean public FlatFileItemReader<Domain> reader() { log.info("reader called"); FlatFileItemReader<Domain> reader = new FlatFileItemReader<Domain>(); reader.setLineMapper(new DefaultLineMapper() {{ setLineTokenizer(new DelimitedLineTokenizer() {{ setNames(new String[]{"id", "domain"}); }}); setFieldSetMapper(new BeanWrapperFieldSetMapper<Domain>() {{ setTargetType(Domain.class); }}); }}); return reader; } @Bean public FlatFileItemWriter<Domain> writer() { log.info("writer called"); FlatFileItemWriter<Domain> writer = new FlatFileItemWriter<>(); writer.setResource(new FileSystemResource("output/domain.all.csv")); writer.setAppendAllowed(true); writer.setLineAggregator(new DelimitedLineAggregator<Domain>() {{ setDelimiter(","); setFieldExtractor(new BeanWrapperFieldExtractor<Domain>() {{ setNames(new String[]{"id", "domain"}); }}); }}); return writer; } }
Map record from CSV file Domain
object and write to CSV file.
A Java model class
package com.walking.techie.model; import lombok.Data; import lombok.ToString; @Data @ToString public class Domain { int id; String domain; }
CustomItemProcessor
process each and every item.
package com.walking.techie.processor; import com.walking.techie.model.Domain; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.item.ItemProcessor; import org.springframework.stereotype.Component; @Slf4j @Component public class CustomItemProcessor implements ItemProcessor<Domain, Domain> { @Override public Domain process(Domain item) throws Exception { log.info("Item is processing : " + item); return item; } }
Below is the list of custom listener that will listen to corresponding events.
CustomChunkListener.java
package com.walking.techie.listener; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.ChunkListener; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.stereotype.Component; @Slf4j @Component public class CustomChunkListener implements ChunkListener { @Override public void beforeChunk(ChunkContext context) { log.info("ChunkListener ---- before chunk called"); } @Override public void afterChunk(ChunkContext context) { log.info("ChunkListener ---- after chunk called"); } @Override public void afterChunkError(ChunkContext context) { log.info("ChunkListener ---- after chunk error called"); } }
CustomItemReaderListener.java
package com.walking.techie.listener; import com.walking.techie.model.Domain; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.ItemReadListener; import org.springframework.stereotype.Component; @Slf4j @Component public class CustomItemReaderListener implements ItemReadListener<Domain> { @Override public void beforeRead() { log.info("ItemReadListener ---- before read "); } @Override public void afterRead(Domain item) { log.info("ItemReadListener ---- after read "); } @Override public void onReadError(Exception ex) { log.info("ItemReadListener ---- exception"); } }
CustomItemWriterListener.java
package com.walking.techie.listener; import com.walking.techie.model.Domain; import java.util.List; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.ItemWriteListener; import org.springframework.stereotype.Component; @Slf4j @Component public class CustomItemWriterListener implements ItemWriteListener<Domain> { @Override public void beforeWrite(List<? extends Domain> items) { log.info("ItemWriteListener ---- before write" + items); } @Override public void afterWrite(List<? extends Domain> items) { log.info("ItemWriteListener ---- after write" + items); } @Override public void onWriteError(Exception exception, List<? extends Domain> items) { log.info("ItemWriteListener ---- exception" + items); } }
CustomProcessListener.java
package com.walking.techie.listener; import com.walking.techie.model.Domain; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.ItemProcessListener; import org.springframework.stereotype.Component; @Slf4j @Component public class CustomProcessListener implements ItemProcessListener<Domain, Domain> { @Override public void beforeProcess(Domain item) { log.info("ItemProcessListener ---- beforeProcess"); } @Override public void afterProcess(Domain item, Domain result) { log.info("ItemProcessListener ---- afterProcess"); } @Override public void onProcessError(Domain item, Exception e) { log.info("ItemProcessListener ---- onProcessError"); } }
CustomStepListener.java
package com.walking.techie.listener; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.StepExecutionListener; import org.springframework.stereotype.Component; @Slf4j @Component public class CustomStepListener implements StepExecutionListener { @Override public void beforeStep(StepExecution stepExecution) { log.info("StepExecutionListener ---- before "); } @Override public ExitStatus afterStep(StepExecution stepExecution) { log.info("StepExecutionListener ---- after "); return null; } }
Run Application
package com.walking.techie; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; @SpringBootApplication(exclude = DataSourceAutoConfiguration.class) public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
Output
Output of the application will store in output/domain.all.csv
1,facebook.com 2,yahoo.com 3,google.com 200,walkingtechie.blogspot.com 300,stackoverflow.com 400,oracle.com 999,eclipse.org 888,baidu.com 777,twitter.com
output on console
2017-03-29 11:06:50.348 INFO 34473 --- [ main] c.w.techie.jobs.SpringBatchListener : Multi reader called 2017-03-29 11:06:50.349 INFO 34473 --- [ main] c.w.techie.jobs.SpringBatchListener : reader called 2017-03-29 11:06:50.363 INFO 34473 --- [ main] c.w.techie.jobs.SpringBatchListener : writer called 2017-03-29 11:06:50.613 INFO 34473 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup 2017-03-29 11:06:50.621 INFO 34473 --- [ main] o.s.b.a.b.JobLauncherCommandLineRunner : Running default command line with: [] 2017-03-29 11:06:50.622 WARN 34473 --- [ main] o.s.b.c.c.a.DefaultBatchConfigurer : No datasource was provided...using a Map based JobRepository 2017-03-29 11:06:50.635 INFO 34473 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : No TaskExecutor has been set, defaulting to synchronous executor. 2017-03-29 11:06:50.672 INFO 34473 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=readFiles]] launched with the following parameters: [{run.id=1}] 2017-03-29 11:06:50.687 INFO 34473 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step1] 2017-03-29 11:06:50.690 INFO 34473 --- [ main] c.w.techie.listener.CustomStepListener : StepExecutionListener ---- before 2017-03-29 11:06:50.699 INFO 34473 --- [ main] c.w.techie.listener.CustomChunkListener : ChunkListener ---- before chunk called 2017-03-29 11:06:50.701 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- before read 2017-03-29 11:06:50.705 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- after read 2017-03-29 11:06:50.705 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- before read 2017-03-29 11:06:50.705 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- after read 2017-03-29 11:06:50.705 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- before read 2017-03-29 11:06:50.706 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- after read 2017-03-29 11:06:50.706 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- before read 2017-03-29 11:06:50.708 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- after read 2017-03-29 11:06:50.708 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- before read 2017-03-29 11:06:50.708 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- after read 2017-03-29 11:06:50.709 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- beforeProcess 2017-03-29 11:06:50.709 INFO 34473 --- [ main] c.w.t.processor.CustomItemProcessor : Item is processing : Domain(id=1, domain=facebook.com) 2017-03-29 11:06:50.709 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- afterProcess 2017-03-29 11:06:50.709 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- beforeProcess 2017-03-29 11:06:50.709 INFO 34473 --- [ main] c.w.t.processor.CustomItemProcessor : Item is processing : Domain(id=2, domain=yahoo.com) 2017-03-29 11:06:50.709 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- afterProcess 2017-03-29 11:06:50.709 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- beforeProcess 2017-03-29 11:06:50.709 INFO 34473 --- [ main] c.w.t.processor.CustomItemProcessor : Item is processing : Domain(id=3, domain=google.com) 2017-03-29 11:06:50.709 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- afterProcess 2017-03-29 11:06:50.709 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- beforeProcess 2017-03-29 11:06:50.709 INFO 34473 --- [ main] c.w.t.processor.CustomItemProcessor : Item is processing : Domain(id=200, domain=walkingtechie.blogspot.com) 2017-03-29 11:06:50.709 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- afterProcess 2017-03-29 11:06:50.709 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- beforeProcess 2017-03-29 11:06:50.709 INFO 34473 --- [ main] c.w.t.processor.CustomItemProcessor : Item is processing : Domain(id=300, domain=stackoverflow.com) 2017-03-29 11:06:50.709 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- afterProcess 2017-03-29 11:06:50.710 INFO 34473 --- [ main] c.w.t.listener.CustomItemWriterListener : ItemWriteListener ---- before write[Domain(id=1, domain=facebook.com), Domain(id=2, domain=yahoo.com), Domain(id=3, domain=google.com), Domain(id=200, domain=walkingtechie.blogspot.com), Domain(id=300, domain=stackoverflow.com)] 2017-03-29 11:06:50.710 INFO 34473 --- [ main] c.w.t.listener.CustomItemWriterListener : ItemWriteListener ---- after write[Domain(id=1, domain=facebook.com), Domain(id=2, domain=yahoo.com), Domain(id=3, domain=google.com), Domain(id=200, domain=walkingtechie.blogspot.com), Domain(id=300, domain=stackoverflow.com)] 2017-03-29 11:06:50.714 INFO 34473 --- [ main] c.w.techie.listener.CustomChunkListener : ChunkListener ---- after chunk called 2017-03-29 11:06:50.714 INFO 34473 --- [ main] c.w.techie.listener.CustomChunkListener : ChunkListener ---- before chunk called 2017-03-29 11:06:50.714 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- before read 2017-03-29 11:06:50.715 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- after read 2017-03-29 11:06:50.715 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- before read 2017-03-29 11:06:50.715 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- after read 2017-03-29 11:06:50.715 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- before read 2017-03-29 11:06:50.715 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- after read 2017-03-29 11:06:50.715 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- before read 2017-03-29 11:06:50.716 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- after read 2017-03-29 11:06:50.716 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- before read 2017-03-29 11:06:50.716 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- beforeProcess 2017-03-29 11:06:50.716 INFO 34473 --- [ main] c.w.t.processor.CustomItemProcessor : Item is processing : Domain(id=400, domain=oracle.com) 2017-03-29 11:06:50.716 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- afterProcess 2017-03-29 11:06:50.716 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- beforeProcess 2017-03-29 11:06:50.716 INFO 34473 --- [ main] c.w.t.processor.CustomItemProcessor : Item is processing : Domain(id=999, domain=eclipse.org) 2017-03-29 11:06:50.716 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- afterProcess 2017-03-29 11:06:50.716 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- beforeProcess 2017-03-29 11:06:50.716 INFO 34473 --- [ main] c.w.t.processor.CustomItemProcessor : Item is processing : Domain(id=888, domain=baidu.com) 2017-03-29 11:06:50.716 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- afterProcess 2017-03-29 11:06:50.716 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- beforeProcess 2017-03-29 11:06:50.716 INFO 34473 --- [ main] c.w.t.processor.CustomItemProcessor : Item is processing : Domain(id=777, domain=twitter.com) 2017-03-29 11:06:50.716 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- afterProcess 2017-03-29 11:06:50.716 INFO 34473 --- [ main] c.w.t.listener.CustomItemWriterListener : ItemWriteListener ---- before write[Domain(id=400, domain=oracle.com), Domain(id=999, domain=eclipse.org), Domain(id=888, domain=baidu.com), Domain(id=777, domain=twitter.com)] 2017-03-29 11:06:50.717 INFO 34473 --- [ main] c.w.t.listener.CustomItemWriterListener : ItemWriteListener ---- after write[Domain(id=400, domain=oracle.com), Domain(id=999, domain=eclipse.org), Domain(id=888, domain=baidu.com), Domain(id=777, domain=twitter.com)] 2017-03-29 11:06:50.719 INFO 34473 --- [ main] c.w.techie.listener.CustomChunkListener : ChunkListener ---- after chunk called 2017-03-29 11:06:50.719 INFO 34473 --- [ main] c.w.techie.listener.CustomStepListener : StepExecutionListener ---- after 2017-03-29 11:06:50.726 INFO 34473 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=readFiles]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED]
Note : This code has been compiled and run on mac notebook and intellij IDEA.
Recommended Posts:
- Spring Batch Hello world example using Spring boot
- Spring Boot - Remove _class field from MongoDB document
- Spring Boot Feature
- CSV Files with different delimiter to Mongo Database
- Using multiple data sources of MongoDB with Spring Boot and Spring Data
- Hash table in Java
- Enum Map in Java
- Identity Hash Map in Java
- Hash Map in Java example
- Enum Set in Java
No comments :
Post a Comment