In Spring Batch, there are seven interfaces to intercept step execution.
- ChunkListener
- ItemProcessListener
- ItemReadListener
- ItemWriteListener
- RetryListener
- SkipListener
- StepExecutionListener
ChunkListener is an interface for lifecycle of a chunk. A chunk can be through of as a collection of items that will be committed together.
This interface will listen for processing of an item. Implementation of this interface will be notified before and after item is passed to the ItemProcessor and in the event of any exceptions thrown by the processor.
Listener interface around the reading of an item.
Listener interface for the writing of items. Implementations of this interface will be notified before, after, and in case of any exception thrown while writing a list of items.
Interface used internally by RetryListener adapters to provide consistent naming. Extends StepListener to allow registration with existing listener methods.
Interface for listener to skipped items. Callbacks will be called by Step implementations at the appropriate time in the step lifecycle.
Listener interface for the lifecycle of a Step.
Project structure
This is a directory structure of the standard gradle project.
Project dependencies
task wrapper(type: Wrapper) {
gradleVersion = '3.2.1'
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
sourceCompatibility = 1.8
repositories {
mavenLocal()
mavenCentral()
}
dependencies {
compileOnly('org.projectlombok:lombok:1.16.12')
compile('org.springframework.boot:spring-boot-starter-batch:1.5.2.RELEASE')
testCompile('org.springframework.boot:spring-boot-starter-test:1.5.2.RELEASE')
}
buildscript {
repositories {
mavenLocal()
jcenter()
}
dependencies {
classpath "org.springframework.boot:spring-boot-gradle-plugin:1.5.2.RELEASE"
}
}
application.properties file
#Empty
Spring Batch Jobs
CSV files
1,facebook.com 2,yahoo.com 3,google.com
200,walkingtechie.blogspot.com 300,stackoverflow.com 400,oracle.com
999,eclipse.org 888,baidu.com 777,twitter.com
Create a job which will read from CSV files and write into CSV files. Listener in this job listen to corresponding events and notify.
package com.walking.techie.jobs;
import com.walking.techie.listener.CustomChunkListener;
import com.walking.techie.listener.CustomItemReaderListener;
import com.walking.techie.listener.CustomItemWriterListener;
import com.walking.techie.listener.CustomProcessListener;
import com.walking.techie.listener.CustomStepListener;
import com.walking.techie.model.Domain;
import com.walking.techie.processor.CustomItemProcessor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.MultiResourceItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor;
import org.springframework.batch.item.file.transform.DelimitedLineAggregator;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
@Slf4j
@Configuration
@EnableBatchProcessing
public class SpringBatchListener {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Value("csv/domain*.csv")
private Resource[] resources;
@Bean
public Job readFiles() {
return jobBuilderFactory.get("readFiles").incrementer(new RunIdIncrementer()).
start(step1(null, null, null)).build();
}
@Bean
public Step step1(CustomItemProcessor itemProcessor, CustomChunkListener chunkListener,
CustomProcessListener processListener) {
return stepBuilderFactory.get("step1").<Domain, Domain>chunk(5)
.reader(multiResourceItemReader()).processor(itemProcessor).writer(writer())
.listener(processListener).listener(itemReaderListener())
.listener(itemWriterListener()).listener(customStepListener()).listener(chunkListener)
.build();
}
@Bean
public CustomStepListener customStepListener() {
return new CustomStepListener();
}
@Bean
public CustomItemWriterListener itemWriterListener() {
return new CustomItemWriterListener();
}
@Bean
public CustomItemReaderListener itemReaderListener() {
return new CustomItemReaderListener();
}
@Bean
public MultiResourceItemReader<Domain> multiResourceItemReader() {
log.info("Multi reader called");
MultiResourceItemReader<Domain> resourceItemReader = new MultiResourceItemReader<Domain>();
resourceItemReader.setResources(resources);
resourceItemReader.setDelegate(reader());
return resourceItemReader;
}
@Bean
public FlatFileItemReader<Domain> reader() {
log.info("reader called");
FlatFileItemReader<Domain> reader = new FlatFileItemReader<Domain>();
reader.setLineMapper(new DefaultLineMapper() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
setNames(new String[]{"id", "domain"});
}});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Domain>() {{
setTargetType(Domain.class);
}});
}});
return reader;
}
@Bean
public FlatFileItemWriter<Domain> writer() {
log.info("writer called");
FlatFileItemWriter<Domain> writer = new FlatFileItemWriter<>();
writer.setResource(new FileSystemResource("output/domain.all.csv"));
writer.setAppendAllowed(true);
writer.setLineAggregator(new DelimitedLineAggregator<Domain>() {{
setDelimiter(",");
setFieldExtractor(new BeanWrapperFieldExtractor<Domain>() {{
setNames(new String[]{"id", "domain"});
}});
}});
return writer;
}
}
Map record from CSV file Domain object and write to CSV file.
A Java model class
package com.walking.techie.model;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
public class Domain {
int id;
String domain;
}
CustomItemProcessor process each and every item.
package com.walking.techie.processor;
import com.walking.techie.model.Domain;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class CustomItemProcessor implements ItemProcessor<Domain, Domain> {
@Override
public Domain process(Domain item) throws Exception {
log.info("Item is processing : " + item);
return item;
}
}
Below is the list of custom listener that will listen to corresponding events.
CustomChunkListener.java
package com.walking.techie.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class CustomChunkListener implements ChunkListener {
@Override
public void beforeChunk(ChunkContext context) {
log.info("ChunkListener ---- before chunk called");
}
@Override
public void afterChunk(ChunkContext context) {
log.info("ChunkListener ---- after chunk called");
}
@Override
public void afterChunkError(ChunkContext context) {
log.info("ChunkListener ---- after chunk error called");
}
}
CustomItemReaderListener.java
package com.walking.techie.listener;
import com.walking.techie.model.Domain;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemReadListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class CustomItemReaderListener implements ItemReadListener<Domain> {
@Override
public void beforeRead() {
log.info("ItemReadListener ---- before read ");
}
@Override
public void afterRead(Domain item) {
log.info("ItemReadListener ---- after read ");
}
@Override
public void onReadError(Exception ex) {
log.info("ItemReadListener ---- exception");
}
}
CustomItemWriterListener.java
package com.walking.techie.listener;
import com.walking.techie.model.Domain;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class CustomItemWriterListener implements ItemWriteListener<Domain> {
@Override
public void beforeWrite(List<? extends Domain> items) {
log.info("ItemWriteListener ---- before write" + items);
}
@Override
public void afterWrite(List<? extends Domain> items) {
log.info("ItemWriteListener ---- after write" + items);
}
@Override
public void onWriteError(Exception exception, List<? extends Domain> items) {
log.info("ItemWriteListener ---- exception" + items);
}
}
CustomProcessListener.java
package com.walking.techie.listener;
import com.walking.techie.model.Domain;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemProcessListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class CustomProcessListener implements ItemProcessListener<Domain, Domain> {
@Override
public void beforeProcess(Domain item) {
log.info("ItemProcessListener ---- beforeProcess");
}
@Override
public void afterProcess(Domain item, Domain result) {
log.info("ItemProcessListener ---- afterProcess");
}
@Override
public void onProcessError(Domain item, Exception e) {
log.info("ItemProcessListener ---- onProcessError");
}
}
CustomStepListener.java
package com.walking.techie.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class CustomStepListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
log.info("StepExecutionListener ---- before ");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
log.info("StepExecutionListener ---- after ");
return null;
}
}
Run Application
package com.walking.techie;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
Output
Output of the application will store in output/domain.all.csv
1,facebook.com 2,yahoo.com 3,google.com 200,walkingtechie.blogspot.com 300,stackoverflow.com 400,oracle.com 999,eclipse.org 888,baidu.com 777,twitter.com
output on console
2017-03-29 11:06:50.348 INFO 34473 --- [ main] c.w.techie.jobs.SpringBatchListener : Multi reader called
2017-03-29 11:06:50.349 INFO 34473 --- [ main] c.w.techie.jobs.SpringBatchListener : reader called
2017-03-29 11:06:50.363 INFO 34473 --- [ main] c.w.techie.jobs.SpringBatchListener : writer called
2017-03-29 11:06:50.613 INFO 34473 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup
2017-03-29 11:06:50.621 INFO 34473 --- [ main] o.s.b.a.b.JobLauncherCommandLineRunner : Running default command line with: []
2017-03-29 11:06:50.622 WARN 34473 --- [ main] o.s.b.c.c.a.DefaultBatchConfigurer : No datasource was provided...using a Map based JobRepository
2017-03-29 11:06:50.635 INFO 34473 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : No TaskExecutor has been set, defaulting to synchronous executor.
2017-03-29 11:06:50.672 INFO 34473 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=readFiles]] launched with the following parameters: [{run.id=1}]
2017-03-29 11:06:50.687 INFO 34473 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step1]
2017-03-29 11:06:50.690 INFO 34473 --- [ main] c.w.techie.listener.CustomStepListener : StepExecutionListener ---- before
2017-03-29 11:06:50.699 INFO 34473 --- [ main] c.w.techie.listener.CustomChunkListener : ChunkListener ---- before chunk called
2017-03-29 11:06:50.701 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- before read
2017-03-29 11:06:50.705 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- after read
2017-03-29 11:06:50.705 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- before read
2017-03-29 11:06:50.705 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- after read
2017-03-29 11:06:50.705 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- before read
2017-03-29 11:06:50.706 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- after read
2017-03-29 11:06:50.706 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- before read
2017-03-29 11:06:50.708 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- after read
2017-03-29 11:06:50.708 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- before read
2017-03-29 11:06:50.708 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- after read
2017-03-29 11:06:50.709 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- beforeProcess
2017-03-29 11:06:50.709 INFO 34473 --- [ main] c.w.t.processor.CustomItemProcessor : Item is processing : Domain(id=1, domain=facebook.com)
2017-03-29 11:06:50.709 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- afterProcess
2017-03-29 11:06:50.709 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- beforeProcess
2017-03-29 11:06:50.709 INFO 34473 --- [ main] c.w.t.processor.CustomItemProcessor : Item is processing : Domain(id=2, domain=yahoo.com)
2017-03-29 11:06:50.709 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- afterProcess
2017-03-29 11:06:50.709 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- beforeProcess
2017-03-29 11:06:50.709 INFO 34473 --- [ main] c.w.t.processor.CustomItemProcessor : Item is processing : Domain(id=3, domain=google.com)
2017-03-29 11:06:50.709 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- afterProcess
2017-03-29 11:06:50.709 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- beforeProcess
2017-03-29 11:06:50.709 INFO 34473 --- [ main] c.w.t.processor.CustomItemProcessor : Item is processing : Domain(id=200, domain=walkingtechie.blogspot.com)
2017-03-29 11:06:50.709 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- afterProcess
2017-03-29 11:06:50.709 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- beforeProcess
2017-03-29 11:06:50.709 INFO 34473 --- [ main] c.w.t.processor.CustomItemProcessor : Item is processing : Domain(id=300, domain=stackoverflow.com)
2017-03-29 11:06:50.709 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- afterProcess
2017-03-29 11:06:50.710 INFO 34473 --- [ main] c.w.t.listener.CustomItemWriterListener : ItemWriteListener ---- before write[Domain(id=1, domain=facebook.com), Domain(id=2, domain=yahoo.com), Domain(id=3, domain=google.com), Domain(id=200, domain=walkingtechie.blogspot.com), Domain(id=300, domain=stackoverflow.com)]
2017-03-29 11:06:50.710 INFO 34473 --- [ main] c.w.t.listener.CustomItemWriterListener : ItemWriteListener ---- after write[Domain(id=1, domain=facebook.com), Domain(id=2, domain=yahoo.com), Domain(id=3, domain=google.com), Domain(id=200, domain=walkingtechie.blogspot.com), Domain(id=300, domain=stackoverflow.com)]
2017-03-29 11:06:50.714 INFO 34473 --- [ main] c.w.techie.listener.CustomChunkListener : ChunkListener ---- after chunk called
2017-03-29 11:06:50.714 INFO 34473 --- [ main] c.w.techie.listener.CustomChunkListener : ChunkListener ---- before chunk called
2017-03-29 11:06:50.714 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- before read
2017-03-29 11:06:50.715 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- after read
2017-03-29 11:06:50.715 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- before read
2017-03-29 11:06:50.715 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- after read
2017-03-29 11:06:50.715 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- before read
2017-03-29 11:06:50.715 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- after read
2017-03-29 11:06:50.715 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- before read
2017-03-29 11:06:50.716 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- after read
2017-03-29 11:06:50.716 INFO 34473 --- [ main] c.w.t.listener.CustomItemReaderListener : ItemReadListener ---- before read
2017-03-29 11:06:50.716 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- beforeProcess
2017-03-29 11:06:50.716 INFO 34473 --- [ main] c.w.t.processor.CustomItemProcessor : Item is processing : Domain(id=400, domain=oracle.com)
2017-03-29 11:06:50.716 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- afterProcess
2017-03-29 11:06:50.716 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- beforeProcess
2017-03-29 11:06:50.716 INFO 34473 --- [ main] c.w.t.processor.CustomItemProcessor : Item is processing : Domain(id=999, domain=eclipse.org)
2017-03-29 11:06:50.716 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- afterProcess
2017-03-29 11:06:50.716 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- beforeProcess
2017-03-29 11:06:50.716 INFO 34473 --- [ main] c.w.t.processor.CustomItemProcessor : Item is processing : Domain(id=888, domain=baidu.com)
2017-03-29 11:06:50.716 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- afterProcess
2017-03-29 11:06:50.716 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- beforeProcess
2017-03-29 11:06:50.716 INFO 34473 --- [ main] c.w.t.processor.CustomItemProcessor : Item is processing : Domain(id=777, domain=twitter.com)
2017-03-29 11:06:50.716 INFO 34473 --- [ main] c.w.t.listener.CustomProcessListener : ItemProcessListener ---- afterProcess
2017-03-29 11:06:50.716 INFO 34473 --- [ main] c.w.t.listener.CustomItemWriterListener : ItemWriteListener ---- before write[Domain(id=400, domain=oracle.com), Domain(id=999, domain=eclipse.org), Domain(id=888, domain=baidu.com), Domain(id=777, domain=twitter.com)]
2017-03-29 11:06:50.717 INFO 34473 --- [ main] c.w.t.listener.CustomItemWriterListener : ItemWriteListener ---- after write[Domain(id=400, domain=oracle.com), Domain(id=999, domain=eclipse.org), Domain(id=888, domain=baidu.com), Domain(id=777, domain=twitter.com)]
2017-03-29 11:06:50.719 INFO 34473 --- [ main] c.w.techie.listener.CustomChunkListener : ChunkListener ---- after chunk called
2017-03-29 11:06:50.719 INFO 34473 --- [ main] c.w.techie.listener.CustomStepListener : StepExecutionListener ---- after
2017-03-29 11:06:50.726 INFO 34473 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=readFiles]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED]
Note : This code has been compiled and run on mac notebook and intellij IDEA.
Recommended Posts:
- Spring Batch Hello world example using Spring boot
- Spring Boot - Remove _class field from MongoDB document
- Spring Boot Feature
- CSV Files with different delimiter to Mongo Database
- Using multiple data sources of MongoDB with Spring Boot and Spring Data
- Hash table in Java
- Enum Map in Java
- Identity Hash Map in Java
- Hash Map in Java example
- Enum Set in Java
No comments :
Post a Comment