Spring Batch Example in Spring boot - Spring Batch listeners example - Walking Techie

Blog about Java programming, Design Pattern, and Data Structure.

Wednesday, March 29, 2017

Spring Batch Example in Spring boot - Spring Batch listeners example

In Spring Batch, there are seven interfaces to intercept step execution.

  1. ChunkListener
  2. ChunkListener is an interface for lifecycle of a chunk. A chunk can be through of as a collection of items that will be committed together.
  3. ItemProcessListener
  4. This interface will listen for processing of an item. Implementation of this interface will be notified before and after item is passed to the ItemProcessor and in the event of any exceptions thrown by the processor.
  5. ItemReadListener
  6. Listener interface around the reading of an item.
  7. ItemWriteListener
  8. Listener interface for the writing of items. Implementations of this interface will be notified before, after, and in case of any exception thrown while writing a list of items.
  9. RetryListener
  10. Interface used internally by RetryListener adapters to provide consistent naming. Extends StepListener to allow registration with existing listener methods.
  11. SkipListener
  12. Interface for listener to skipped items. Callbacks will be called by Step implementations at the appropriate time in the step lifecycle.
  13. StepExecutionListener
  14. Listener interface for the lifecycle of a Step.

Project structure

This is a directory structure of the standard gradle project.

spring batch listeners example project structure

Project dependencies

task wrapper(type: Wrapper) {
    gradleVersion = '3.2.1'
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'

sourceCompatibility = 1.8

repositories {
    mavenLocal()
    mavenCentral()
}


dependencies {
    compileOnly('org.projectlombok:lombok:1.16.12')
    compile('org.springframework.boot:spring-boot-starter-batch:1.5.2.RELEASE')
    testCompile('org.springframework.boot:spring-boot-starter-test:1.5.2.RELEASE')
}
buildscript {
    repositories {
        mavenLocal()
        jcenter()
    }
    dependencies {
        classpath "org.springframework.boot:spring-boot-gradle-plugin:1.5.2.RELEASE"
    }
}

application.properties file

#Empty

Spring Batch Jobs

CSV files

1,facebook.com
2,yahoo.com
3,google.com
200,walkingtechie.blogspot.com
300,stackoverflow.com
400,oracle.com
999,eclipse.org
888,baidu.com
777,twitter.com

Create a job which will read from CSV files and write into CSV files. Listener in this job listen to corresponding events and notify.

package com.walking.techie.jobs;

import com.walking.techie.listener.CustomChunkListener;
import com.walking.techie.listener.CustomItemReaderListener;
import com.walking.techie.listener.CustomItemWriterListener;
import com.walking.techie.listener.CustomProcessListener;
import com.walking.techie.listener.CustomStepListener;
import com.walking.techie.model.Domain;
import com.walking.techie.processor.CustomItemProcessor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.MultiResourceItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor;
import org.springframework.batch.item.file.transform.DelimitedLineAggregator;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;

@Slf4j
@Configuration
@EnableBatchProcessing
public class SpringBatchListener {

  @Autowired
  private JobBuilderFactory jobBuilderFactory;
  @Autowired
  private StepBuilderFactory stepBuilderFactory;

  @Value("csv/domain*.csv")
  private Resource[] resources;

  @Bean
  public Job readFiles() {
    return jobBuilderFactory.get("readFiles").incrementer(new RunIdIncrementer()).
        start(step1(null, null, null)).build();
  }

  @Bean
  public Step step1(CustomItemProcessor itemProcessor, CustomChunkListener chunkListener,
      CustomProcessListener processListener) {
    return stepBuilderFactory.get("step1").<Domain, Domain>chunk(5)
        .reader(multiResourceItemReader()).processor(itemProcessor).writer(writer())
        .listener(processListener).listener(itemReaderListener())
        .listener(itemWriterListener()).listener(customStepListener()).listener(chunkListener)
        .build();
  }

  @Bean
  public CustomStepListener customStepListener() {
    return new CustomStepListener();
  }

  @Bean
  public CustomItemWriterListener itemWriterListener() {
    return new CustomItemWriterListener();
  }

  @Bean
  public CustomItemReaderListener itemReaderListener() {
    return new CustomItemReaderListener();
  }

  @Bean
  public MultiResourceItemReader<Domain> multiResourceItemReader() {
    log.info("Multi reader called");
    MultiResourceItemReader<Domain> resourceItemReader = new MultiResourceItemReader<Domain>();
    resourceItemReader.setResources(resources);
    resourceItemReader.setDelegate(reader());
    return resourceItemReader;
  }

  @Bean
  public FlatFileItemReader<Domain> reader() {
    log.info("reader called");
    FlatFileItemReader<Domain> reader = new FlatFileItemReader<Domain>();
    reader.setLineMapper(new DefaultLineMapper() {{
      setLineTokenizer(new DelimitedLineTokenizer() {{
        setNames(new String[]{"id", "domain"});
      }});
      setFieldSetMapper(new BeanWrapperFieldSetMapper<Domain>() {{
        setTargetType(Domain.class);
      }});
    }});
    return reader;
  }

  @Bean
  public FlatFileItemWriter<Domain> writer() {
    log.info("writer called");
    FlatFileItemWriter<Domain> writer = new FlatFileItemWriter<>();
    writer.setResource(new FileSystemResource("output/domain.all.csv"));
    writer.setAppendAllowed(true);
    writer.setLineAggregator(new DelimitedLineAggregator<Domain>() {{
      setDelimiter(",");
      setFieldExtractor(new BeanWrapperFieldExtractor<Domain>() {{
        setNames(new String[]{"id", "domain"});
      }});
    }});
    return writer;
  }
}

Map record from CSV file Domain object and write to CSV file.

A Java model class

package com.walking.techie.model;

import lombok.Data;
import lombok.ToString;

@Data
@ToString
public class Domain {

  int id;
  String domain;
}

CustomItemProcessor process each and every item.

package com.walking.techie.processor;

import com.walking.techie.model.Domain;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class CustomItemProcessor implements ItemProcessor<Domain, Domain> {

  @Override
  public Domain process(Domain item) throws Exception {
    log.info("Item is processing : " + item);
    return item;
  }
}

Below is the list of custom listener that will listen to corresponding events.

CustomChunkListener.java
package com.walking.techie.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class CustomChunkListener implements ChunkListener {

  @Override
  public void beforeChunk(ChunkContext context) {
    log.info("ChunkListener ---- before chunk called");
  }

  @Override
  public void afterChunk(ChunkContext context) {
    log.info("ChunkListener ---- after chunk called");
  }

  @Override
  public void afterChunkError(ChunkContext context) {
    log.info("ChunkListener ---- after chunk error called");
  }
}
CustomItemReaderListener.java
package com.walking.techie.listener;

import com.walking.techie.model.Domain;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemReadListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class CustomItemReaderListener implements ItemReadListener<Domain> {

  @Override
  public void beforeRead() {
    log.info("ItemReadListener ---- before read ");
  }

  @Override
  public void afterRead(Domain item) {
    log.info("ItemReadListener ---- after read ");
  }

  @Override
  public void onReadError(Exception ex) {
    log.info("ItemReadListener ---- exception");
  }
}
CustomItemWriterListener.java
package com.walking.techie.listener;

import com.walking.techie.model.Domain;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class CustomItemWriterListener implements ItemWriteListener<Domain> {

  @Override
  public void beforeWrite(List<? extends Domain> items) {
    log.info("ItemWriteListener ---- before write" + items);
  }

  @Override
  public void afterWrite(List<? extends Domain> items) {
    log.info("ItemWriteListener ---- after write" + items);

  }

  @Override
  public void onWriteError(Exception exception, List<? extends Domain> items) {
    log.info("ItemWriteListener ---- exception" + items);
  }
}
CustomProcessListener.java
package com.walking.techie.listener;

import com.walking.techie.model.Domain;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemProcessListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class CustomProcessListener implements ItemProcessListener<Domain, Domain> {

  @Override
  public void beforeProcess(Domain item) {
    log.info("ItemProcessListener ---- beforeProcess");
  }

  @Override
  public void afterProcess(Domain item, Domain result) {
    log.info("ItemProcessListener ---- afterProcess");
  }

  @Override
  public void onProcessError(Domain item, Exception e) {
    log.info("ItemProcessListener ---- onProcessError");
  }
}
CustomStepListener.java
package com.walking.techie.listener;


import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class CustomStepListener implements StepExecutionListener {

  @Override
  public void beforeStep(StepExecution stepExecution) {
    log.info("StepExecutionListener  ---- before  ");
  }

  @Override
  public ExitStatus afterStep(StepExecution stepExecution) {
    log.info("StepExecutionListener  ---- after  ");
    return null;
  }
}

Run Application

package com.walking.techie;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;

@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class Application {

  public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
  }
}

Output

Output of the application will store in output/domain.all.csv

1,facebook.com
2,yahoo.com
3,google.com
200,walkingtechie.blogspot.com
300,stackoverflow.com
400,oracle.com
999,eclipse.org
888,baidu.com
777,twitter.com

output on console

2017-03-29 11:06:50.348  INFO 34473 --- [           main] c.w.techie.jobs.SpringBatchListener      : Multi reader called
2017-03-29 11:06:50.349  INFO 34473 --- [           main] c.w.techie.jobs.SpringBatchListener      : reader called
2017-03-29 11:06:50.363  INFO 34473 --- [           main] c.w.techie.jobs.SpringBatchListener      : writer called
2017-03-29 11:06:50.613  INFO 34473 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
2017-03-29 11:06:50.621  INFO 34473 --- [           main] o.s.b.a.b.JobLauncherCommandLineRunner   : Running default command line with: []
2017-03-29 11:06:50.622  WARN 34473 --- [           main] o.s.b.c.c.a.DefaultBatchConfigurer       : No datasource was provided...using a Map based JobRepository
2017-03-29 11:06:50.635  INFO 34473 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : No TaskExecutor has been set, defaulting to synchronous executor.
2017-03-29 11:06:50.672  INFO 34473 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=readFiles]] launched with the following parameters: [{run.id=1}]
2017-03-29 11:06:50.687  INFO 34473 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step1]
2017-03-29 11:06:50.690  INFO 34473 --- [           main] c.w.techie.listener.CustomStepListener   : StepExecutionListener  ---- before
2017-03-29 11:06:50.699  INFO 34473 --- [           main] c.w.techie.listener.CustomChunkListener  : ChunkListener ---- before chunk called
2017-03-29 11:06:50.701  INFO 34473 --- [           main] c.w.t.listener.CustomItemReaderListener  : ItemReadListener ---- before read
2017-03-29 11:06:50.705  INFO 34473 --- [           main] c.w.t.listener.CustomItemReaderListener  : ItemReadListener ---- after read
2017-03-29 11:06:50.705  INFO 34473 --- [           main] c.w.t.listener.CustomItemReaderListener  : ItemReadListener ---- before read
2017-03-29 11:06:50.705  INFO 34473 --- [           main] c.w.t.listener.CustomItemReaderListener  : ItemReadListener ---- after read
2017-03-29 11:06:50.705  INFO 34473 --- [           main] c.w.t.listener.CustomItemReaderListener  : ItemReadListener ---- before read
2017-03-29 11:06:50.706  INFO 34473 --- [           main] c.w.t.listener.CustomItemReaderListener  : ItemReadListener ---- after read
2017-03-29 11:06:50.706  INFO 34473 --- [           main] c.w.t.listener.CustomItemReaderListener  : ItemReadListener ---- before read
2017-03-29 11:06:50.708  INFO 34473 --- [           main] c.w.t.listener.CustomItemReaderListener  : ItemReadListener ---- after read
2017-03-29 11:06:50.708  INFO 34473 --- [           main] c.w.t.listener.CustomItemReaderListener  : ItemReadListener ---- before read
2017-03-29 11:06:50.708  INFO 34473 --- [           main] c.w.t.listener.CustomItemReaderListener  : ItemReadListener ---- after read
2017-03-29 11:06:50.709  INFO 34473 --- [           main] c.w.t.listener.CustomProcessListener     : ItemProcessListener ---- beforeProcess
2017-03-29 11:06:50.709  INFO 34473 --- [           main] c.w.t.processor.CustomItemProcessor      : Item is processing : Domain(id=1, domain=facebook.com)
2017-03-29 11:06:50.709  INFO 34473 --- [           main] c.w.t.listener.CustomProcessListener     : ItemProcessListener ---- afterProcess
2017-03-29 11:06:50.709  INFO 34473 --- [           main] c.w.t.listener.CustomProcessListener     : ItemProcessListener ---- beforeProcess
2017-03-29 11:06:50.709  INFO 34473 --- [           main] c.w.t.processor.CustomItemProcessor      : Item is processing : Domain(id=2, domain=yahoo.com)
2017-03-29 11:06:50.709  INFO 34473 --- [           main] c.w.t.listener.CustomProcessListener     : ItemProcessListener ---- afterProcess
2017-03-29 11:06:50.709  INFO 34473 --- [           main] c.w.t.listener.CustomProcessListener     : ItemProcessListener ---- beforeProcess
2017-03-29 11:06:50.709  INFO 34473 --- [           main] c.w.t.processor.CustomItemProcessor      : Item is processing : Domain(id=3, domain=google.com)
2017-03-29 11:06:50.709  INFO 34473 --- [           main] c.w.t.listener.CustomProcessListener     : ItemProcessListener ---- afterProcess
2017-03-29 11:06:50.709  INFO 34473 --- [           main] c.w.t.listener.CustomProcessListener     : ItemProcessListener ---- beforeProcess
2017-03-29 11:06:50.709  INFO 34473 --- [           main] c.w.t.processor.CustomItemProcessor      : Item is processing : Domain(id=200, domain=walkingtechie.blogspot.com)
2017-03-29 11:06:50.709  INFO 34473 --- [           main] c.w.t.listener.CustomProcessListener     : ItemProcessListener ---- afterProcess
2017-03-29 11:06:50.709  INFO 34473 --- [           main] c.w.t.listener.CustomProcessListener     : ItemProcessListener ---- beforeProcess
2017-03-29 11:06:50.709  INFO 34473 --- [           main] c.w.t.processor.CustomItemProcessor      : Item is processing : Domain(id=300, domain=stackoverflow.com)
2017-03-29 11:06:50.709  INFO 34473 --- [           main] c.w.t.listener.CustomProcessListener     : ItemProcessListener ---- afterProcess
2017-03-29 11:06:50.710  INFO 34473 --- [           main] c.w.t.listener.CustomItemWriterListener  : ItemWriteListener ---- before write[Domain(id=1, domain=facebook.com), Domain(id=2, domain=yahoo.com), Domain(id=3, domain=google.com), Domain(id=200, domain=walkingtechie.blogspot.com), Domain(id=300, domain=stackoverflow.com)]
2017-03-29 11:06:50.710  INFO 34473 --- [           main] c.w.t.listener.CustomItemWriterListener  : ItemWriteListener ---- after write[Domain(id=1, domain=facebook.com), Domain(id=2, domain=yahoo.com), Domain(id=3, domain=google.com), Domain(id=200, domain=walkingtechie.blogspot.com), Domain(id=300, domain=stackoverflow.com)]
2017-03-29 11:06:50.714  INFO 34473 --- [           main] c.w.techie.listener.CustomChunkListener  : ChunkListener ---- after chunk called
2017-03-29 11:06:50.714  INFO 34473 --- [           main] c.w.techie.listener.CustomChunkListener  : ChunkListener ---- before chunk called
2017-03-29 11:06:50.714  INFO 34473 --- [           main] c.w.t.listener.CustomItemReaderListener  : ItemReadListener ---- before read
2017-03-29 11:06:50.715  INFO 34473 --- [           main] c.w.t.listener.CustomItemReaderListener  : ItemReadListener ---- after read
2017-03-29 11:06:50.715  INFO 34473 --- [           main] c.w.t.listener.CustomItemReaderListener  : ItemReadListener ---- before read
2017-03-29 11:06:50.715  INFO 34473 --- [           main] c.w.t.listener.CustomItemReaderListener  : ItemReadListener ---- after read
2017-03-29 11:06:50.715  INFO 34473 --- [           main] c.w.t.listener.CustomItemReaderListener  : ItemReadListener ---- before read
2017-03-29 11:06:50.715  INFO 34473 --- [           main] c.w.t.listener.CustomItemReaderListener  : ItemReadListener ---- after read
2017-03-29 11:06:50.715  INFO 34473 --- [           main] c.w.t.listener.CustomItemReaderListener  : ItemReadListener ---- before read
2017-03-29 11:06:50.716  INFO 34473 --- [           main] c.w.t.listener.CustomItemReaderListener  : ItemReadListener ---- after read
2017-03-29 11:06:50.716  INFO 34473 --- [           main] c.w.t.listener.CustomItemReaderListener  : ItemReadListener ---- before read
2017-03-29 11:06:50.716  INFO 34473 --- [           main] c.w.t.listener.CustomProcessListener     : ItemProcessListener ---- beforeProcess
2017-03-29 11:06:50.716  INFO 34473 --- [           main] c.w.t.processor.CustomItemProcessor      : Item is processing : Domain(id=400, domain=oracle.com)
2017-03-29 11:06:50.716  INFO 34473 --- [           main] c.w.t.listener.CustomProcessListener     : ItemProcessListener ---- afterProcess
2017-03-29 11:06:50.716  INFO 34473 --- [           main] c.w.t.listener.CustomProcessListener     : ItemProcessListener ---- beforeProcess
2017-03-29 11:06:50.716  INFO 34473 --- [           main] c.w.t.processor.CustomItemProcessor      : Item is processing : Domain(id=999, domain=eclipse.org)
2017-03-29 11:06:50.716  INFO 34473 --- [           main] c.w.t.listener.CustomProcessListener     : ItemProcessListener ---- afterProcess
2017-03-29 11:06:50.716  INFO 34473 --- [           main] c.w.t.listener.CustomProcessListener     : ItemProcessListener ---- beforeProcess
2017-03-29 11:06:50.716  INFO 34473 --- [           main] c.w.t.processor.CustomItemProcessor      : Item is processing : Domain(id=888, domain=baidu.com)
2017-03-29 11:06:50.716  INFO 34473 --- [           main] c.w.t.listener.CustomProcessListener     : ItemProcessListener ---- afterProcess
2017-03-29 11:06:50.716  INFO 34473 --- [           main] c.w.t.listener.CustomProcessListener     : ItemProcessListener ---- beforeProcess
2017-03-29 11:06:50.716  INFO 34473 --- [           main] c.w.t.processor.CustomItemProcessor      : Item is processing : Domain(id=777, domain=twitter.com)
2017-03-29 11:06:50.716  INFO 34473 --- [           main] c.w.t.listener.CustomProcessListener     : ItemProcessListener ---- afterProcess
2017-03-29 11:06:50.716  INFO 34473 --- [           main] c.w.t.listener.CustomItemWriterListener  : ItemWriteListener ---- before write[Domain(id=400, domain=oracle.com), Domain(id=999, domain=eclipse.org), Domain(id=888, domain=baidu.com), Domain(id=777, domain=twitter.com)]
2017-03-29 11:06:50.717  INFO 34473 --- [           main] c.w.t.listener.CustomItemWriterListener  : ItemWriteListener ---- after write[Domain(id=400, domain=oracle.com), Domain(id=999, domain=eclipse.org), Domain(id=888, domain=baidu.com), Domain(id=777, domain=twitter.com)]
2017-03-29 11:06:50.719  INFO 34473 --- [           main] c.w.techie.listener.CustomChunkListener  : ChunkListener ---- after chunk called
2017-03-29 11:06:50.719  INFO 34473 --- [           main] c.w.techie.listener.CustomStepListener   : StepExecutionListener  ---- after
2017-03-29 11:06:50.726  INFO 34473 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=readFiles]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED]

Note : This code has been compiled and run on mac notebook and intellij IDEA.

No comments :

Post a Comment