Photo Credit : Spring Batch
In Spring Batch, "partitioning" is multiple threads to process range of data each. Lets take example, You have 100 records in table, which has primary id assigned from 1 to 100 and you want to access all 100 records.
All the discussed spring batch example, Normally a single thread example. If processing of 1 to 100 records takes 1 minutes in single thread example.
Single Thread -- process records from 1 to 100
In Partition, We can start 10 threads (slave) to process 10 records each, so normally one thread takes 6 seconds
Thread 1 -- process records from 1 to 10 Thread 1 -- process records from 1 to 10 Thread 2 -- process records from 11 to 20 Thread 3 -- process records from 21 to 30 Thread 4 -- process records from 31 to 40 Thread 5 -- process records from 41 to 50 Thread 6 -- process records from 51 to 60 .......... .......... Thread 9 -- process records from 81 to 90 Thread 10 -- process records from 91 to 100
Project structure
This is a directory structure of the standard gradle project.
Project dependencies
In this tutorial we will discuss about how to create "partitioner" job, which has 10 threads, each thread will read data from database, based on providing range of data.
User table have data like below and have 100 records.
id, username, password, age 1,santosh,password,30 2,santosh,password,24 3,santosh,password,22 ......... ......... .........
task wrapper(type: Wrapper) { gradleVersion = '3.2.1' } apply plugin: 'java' apply plugin: 'eclipse' apply plugin: 'org.springframework.boot' sourceCompatibility = 1.8 repositories { mavenLocal() mavenCentral() } dependencies { compileOnly('org.projectlombok:lombok:1.16.12') compile('org.springframework.boot:spring-boot-starter-batch:1.5.2.RELEASE') testCompile('org.springframework.boot:spring-boot-starter-test:1.5.2.RELEASE') } buildscript { repositories { mavenLocal() jcenter() } dependencies { classpath "org.springframework.boot:spring-boot-gradle-plugin:1.5.2.RELEASE" } }
application.properties file
#empty
Spring Batch Jobs
In PartitionerJob
:
- In
TaskExecutorPartitionHandler
setting grid size which actually number of threads.- for slaveReader,
#{stepExecutionContext[fromId]}
,#{stepExecutionContext[toId]
, and#{stepExecutionContext[name]
value will be injected by theExecutionContext
in rangePartitioner.- For writers, each thread will output the records in a different csv files, with filename format - users.processed[fromId]}-[toId].csv.
package com.walking.techie.jobs; import com.walking.techie.model.User; import com.walking.techie.partition.RangePartitioner; import com.walking.techie.processor.UserProcessor; import com.walking.techie.tasklet.DummyTasklet; import java.util.HashMap; import java.util.Map; import javax.sql.DataSource; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.core.partition.PartitionHandler; import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler; import org.springframework.batch.item.database.JdbcPagingItemReader; import org.springframework.batch.item.database.PagingQueryProvider; import org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean; import org.springframework.batch.item.file.FlatFileItemWriter; import org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor; import org.springframework.batch.item.file.transform.DelimitedLineAggregator; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.FileSystemResource; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.jdbc.core.BeanPropertyRowMapper; @Slf4j @Configuration @EnableBatchProcessing public class PartitionerJob { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired private DataSource dataSource; @Bean public Job PartitionJob() { return jobBuilderFactory.get("partitionJob").incrementer(new RunIdIncrementer()) .start(masterStep()).next(step2()).build(); } @Bean public Step step2() { return stepBuilderFactory.get("step2").tasklet(dummyTask()).build(); } @Bean public DummyTasklet dummyTask() { return new DummyTasklet(); } @Bean public Step masterStep() { return stepBuilderFactory.get("masterStep").partitioner(slave().getName(), rangePartitioner()) .partitionHandler(masterSlaveHandler()).build(); } @Bean public PartitionHandler masterSlaveHandler() { TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler(); handler.setGridSize(10); handler.setTaskExecutor(taskExecutor()); handler.setStep(slave()); try { handler.afterPropertiesSet(); } catch (Exception e) { e.printStackTrace(); } return handler; } @Bean(name = "slave") public Step slave() { log.info("...........called slave ........."); return stepBuilderFactory.get("slave").<User, User>chunk(100) .reader(slaveReader(null, null, null)) .processor(slaveProcessor(null)).writer(slaveWriter(null, null)).build(); } @Bean public RangePartitioner rangePartitioner() { return new RangePartitioner(); } @Bean public SimpleAsyncTaskExecutor taskExecutor() { return new SimpleAsyncTaskExecutor(); } @Bean @StepScope public UserProcessor slaveProcessor(@Value("#{stepExecutionContext[name]}") String name) { log.info("********called slave processor **********"); UserProcessor userProcessor = new UserProcessor(); userProcessor.setThreadName(name); return userProcessor; } @Bean @StepScope public JdbcPagingItemReader<User> slaveReader( @Value("#{stepExecutionContext[fromId]}") final String fromId, @Value("#{stepExecutionContext[toId]}") final String toId, @Value("#{stepExecutionContext[name]}") final String name) { log.info("slaveReader start " + fromId + " " + toId); JdbcPagingItemReader<User> reader = new JdbcPagingItemReader<>(); reader.setDataSource(dataSource); reader.setQueryProvider(queryProvider()); Map<String, Object> parameterValues = new HashMap<>(); parameterValues.put("fromId", fromId); parameterValues.put("toId", toId); log.info("Parameter Value " + name + " " + parameterValues); reader.setParameterValues(parameterValues); reader.setPageSize(1000); reader.setRowMapper(new BeanPropertyRowMapper<User>() {{ setMappedClass(User.class); }}); log.info("slaveReader end " + fromId + " " + toId); return reader; } @Bean public PagingQueryProvider queryProvider() { log.info("queryProvider start "); SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean(); provider.setDataSource(dataSource); provider.setSelectClause("select id, username, password, age"); provider.setFromClause("from user"); provider.setWhereClause("where id >= :fromId and id <= :toId"); provider.setSortKey("id"); log.info("queryProvider end "); try { return provider.getObject(); } catch (Exception e) { log.info("queryProvider exception "); e.printStackTrace(); } return null; } @Bean @StepScope public FlatFileItemWriter<User> slaveWriter( @Value("#{stepExecutionContext[fromId]}") final String fromId, @Value("#{stepExecutionContext[toId]}") final String toId) { FlatFileItemWriter<User> reader = new FlatFileItemWriter<>(); reader.setResource(new FileSystemResource( "csv/outputs/users.processed" + fromId + "-" + toId + ".csv")); //reader.setAppendAllowed(false); reader.setLineAggregator(new DelimitedLineAggregator<User>() {{ setDelimiter(","); setFieldExtractor(new BeanWrapperFieldExtractor<User>() {{ setNames(new String[]{"id", "username", "password", "age"}); }}); }}); return reader; } }
A Java model class
package com.walking.techie.model; import lombok.Data; @Data public class User { int id; String username; String password; int age; }
First, create a Partitioner implementation, puts the “partitioning range” into the ExecutionContext
.
Later, you can fetch from ExecutionContext
.
In this case, the partitioning range is look like the following :
Thread 1 = 1 - 10 Thread 2 = 11 - 20 Thread 3 = 21 - 30 ...... Thread 10 = 91 - 100
package com.walking.techie.partition; import java.util.HashMap; import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.partition.support.Partitioner; import org.springframework.batch.item.ExecutionContext; @Slf4j public class RangePartitioner implements Partitioner { @Override public Map<String, ExecutionContext> partition(int gridSize) { log.info("partition called gridsize= " + gridSize); Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>(); int range = 10; int fromId = 1; int toId = range; for (int i = 1; i <= gridSize; i++) { ExecutionContext value = new ExecutionContext(); System.out.println("\nStarting : Thread" + i); System.out.println("fromId : " + fromId); System.out.println("toId : " + toId); value.putInt("fromId", fromId); value.putInt("toId", toId); // give each thread a name, thread 1,2,3 value.putString("name", "Thread" + i); result.put("partition" + i, value); fromId = toId + 1; toId += range; } return result; } }
The UserProcessor
class is used to print the processing item thread name, id and username.
package com.walking.techie.processor; import com.walking.techie.model.User; import org.springframework.batch.item.ItemProcessor; import org.springframework.stereotype.Component; @Component public class UserProcessor implements ItemProcessor<User, User> { private String threadName; public String getThreadName() { return threadName; } public void setThreadName(String threadName) { this.threadName = threadName; } @Override public User process(User item) throws Exception { System.out.println(threadName + " processing : " + item.getId() + " : " + item.getUsername()); return item; } }
This is the dummy tasklet which will called after all the threads completed execution.
package com.walking.techie.tasklet; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.repeat.RepeatStatus; @Slf4j public class DummyTasklet implements Tasklet { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { log.info("Dummy Tasklet called."); return RepeatStatus.FINISHED; } }
Run Application
package com.walking.techie; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
Output
Output of the application will store in csv/outputs/ folder.
output in console
2017-03-29 20:49:56.538 INFO 44616 --- [ main] org.hibernate.Version : HHH000412: Hibernate Core {5.0.12.Final} 2017-03-29 20:49:56.539 INFO 44616 --- [ main] org.hibernate.cfg.Environment : HHH000206: hibernate.properties not found 2017-03-29 20:49:56.540 INFO 44616 --- [ main] org.hibernate.cfg.Environment : HHH000021: Bytecode provider name : javassist 2017-03-29 20:49:56.571 INFO 44616 --- [ main] o.hibernate.annotations.common.Version : HCANN000001: Hibernate Commons Annotations {5.0.1.Final} 2017-03-29 20:49:56.661 INFO 44616 --- [ main] org.hibernate.dialect.Dialect : HHH000400: Using dialect: org.hibernate.dialect.MySQL5Dialect 2017-03-29 20:49:56.815 INFO 44616 --- [ main] org.hibernate.tool.hbm2ddl.SchemaUpdate : HHH000228: Running hbm2ddl schema update 2017-03-29 20:49:56.843 INFO 44616 --- [ main] j.LocalContainerEntityManagerFactoryBean : Initialized JPA EntityManagerFactory for persistence unit 'default' 2017-03-29 20:49:56.929 INFO 44616 --- [ main] com.walking.techie.jobs.PartitionerJob : ...........called slave ......... 2017-03-29 20:49:57.047 INFO 44616 --- [ main] com.walking.techie.jobs.PartitionerJob : queryProvider start 2017-03-29 20:49:57.052 INFO 44616 --- [ main] com.walking.techie.jobs.PartitionerJob : queryProvider end 2017-03-29 20:49:57.165 WARN 44616 --- [ main] o.s.b.a.batch.BasicBatchConfigurer : JPA does not support custom isolation levels, so locks may not be taken when launching Jobs 2017-03-29 20:49:57.169 INFO 44616 --- [ main] o.s.b.c.r.s.JobRepositoryFactoryBean : No database type set, using meta data indicating: MYSQL 2017-03-29 20:49:57.258 INFO 44616 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : No TaskExecutor has been set, defaulting to synchronous executor. 2017-03-29 20:49:57.275 INFO 44616 --- [ main] o.s.jdbc.datasource.init.ScriptUtils : Executing SQL script from class path resource [org/springframework/batch/core/schema-mysql.sql] 2017-03-29 20:49:57.293 INFO 44616 --- [ main] o.s.jdbc.datasource.init.ScriptUtils : Executed SQL script from class path resource [org/springframework/batch/core/schema-mysql.sql] in 17 ms. 2017-03-29 20:49:57.442 INFO 44616 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup 2017-03-29 20:49:57.451 INFO 44616 --- [ main] o.s.b.a.b.JobLauncherCommandLineRunner : Running default command line with: [] 2017-03-29 20:49:57.571 INFO 44616 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=partitionJob]] launched with the following parameters: [{run.id=5}] 2017-03-29 20:49:57.585 INFO 44616 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [masterStep] 2017-03-29 20:49:57.591 INFO 44616 --- [ main] c.w.techie.partition.RangePartitioner : partition called gridsize= 10 Starting : Thread1 fromId : 1 toId : 10 Starting : Thread2 fromId : 11 toId : 20 Starting : Thread3 fromId : 21 toId : 30 Starting : Thread4 fromId : 31 toId : 40 Starting : Thread5 fromId : 41 toId : 50 Starting : Thread6 fromId : 51 toId : 60 Starting : Thread7 fromId : 61 toId : 70 Starting : Thread8 fromId : 71 toId : 80 Starting : Thread9 fromId : 81 toId : 90 Starting : Thread10 fromId : 91 toId : 100 2017-03-29 20:49:57.666 INFO 44616 --- [cTaskExecutor-4] com.walking.techie.jobs.PartitionerJob : slaveReader start 41 50 2017-03-29 20:49:57.668 INFO 44616 --- [cTaskExecutor-4] com.walking.techie.jobs.PartitionerJob : Parameter Value Thread5 {toId=50, fromId=41} 2017-03-29 20:49:57.670 INFO 44616 --- [cTaskExecutor-4] com.walking.techie.jobs.PartitionerJob : slaveReader end 41 50 2017-03-29 20:49:57.679 INFO 44616 --- [TaskExecutor-10] com.walking.techie.jobs.PartitionerJob : slaveReader start 71 80 2017-03-29 20:49:57.679 INFO 44616 --- [TaskExecutor-10] com.walking.techie.jobs.PartitionerJob : Parameter Value Thread8 {toId=80, fromId=71} 2017-03-29 20:49:57.679 INFO 44616 --- [TaskExecutor-10] com.walking.techie.jobs.PartitionerJob : slaveReader end 71 80 2017-03-29 20:49:57.680 INFO 44616 --- [cTaskExecutor-3] com.walking.techie.jobs.PartitionerJob : slaveReader start 11 20 2017-03-29 20:49:57.681 INFO 44616 --- [cTaskExecutor-3] com.walking.techie.jobs.PartitionerJob : Parameter Value Thread2 {toId=20, fromId=11} 2017-03-29 20:49:57.681 INFO 44616 --- [cTaskExecutor-3] com.walking.techie.jobs.PartitionerJob : slaveReader end 11 20 2017-03-29 20:49:57.682 INFO 44616 --- [cTaskExecutor-5] com.walking.techie.jobs.PartitionerJob : slaveReader start 91 100 2017-03-29 20:49:57.682 INFO 44616 --- [cTaskExecutor-5] com.walking.techie.jobs.PartitionerJob : Parameter Value Thread10 {toId=100, fromId=91} 2017-03-29 20:49:57.682 INFO 44616 --- [cTaskExecutor-5] com.walking.techie.jobs.PartitionerJob : slaveReader end 91 100 2017-03-29 20:49:57.683 INFO 44616 --- [cTaskExecutor-6] com.walking.techie.jobs.PartitionerJob : slaveReader start 51 60 2017-03-29 20:49:57.683 INFO 44616 --- [cTaskExecutor-6] com.walking.techie.jobs.PartitionerJob : Parameter Value Thread6 {toId=60, fromId=51} 2017-03-29 20:49:57.684 INFO 44616 --- [cTaskExecutor-6] com.walking.techie.jobs.PartitionerJob : slaveReader end 51 60 2017-03-29 20:49:57.685 INFO 44616 --- [cTaskExecutor-9] com.walking.techie.jobs.PartitionerJob : slaveReader start 31 40 2017-03-29 20:49:57.685 INFO 44616 --- [cTaskExecutor-9] com.walking.techie.jobs.PartitionerJob : Parameter Value Thread4 {toId=40, fromId=31} 2017-03-29 20:49:57.685 INFO 44616 --- [cTaskExecutor-9] com.walking.techie.jobs.PartitionerJob : slaveReader end 31 40 2017-03-29 20:49:57.686 INFO 44616 --- [cTaskExecutor-8] com.walking.techie.jobs.PartitionerJob : slaveReader start 1 10 2017-03-29 20:49:57.687 INFO 44616 --- [cTaskExecutor-8] com.walking.techie.jobs.PartitionerJob : Parameter Value Thread1 {toId=10, fromId=1} 2017-03-29 20:49:57.687 INFO 44616 --- [cTaskExecutor-8] com.walking.techie.jobs.PartitionerJob : slaveReader end 1 10 2017-03-29 20:49:57.688 INFO 44616 --- [cTaskExecutor-1] com.walking.techie.jobs.PartitionerJob : slaveReader start 61 70 2017-03-29 20:49:57.688 INFO 44616 --- [cTaskExecutor-1] com.walking.techie.jobs.PartitionerJob : Parameter Value Thread7 {toId=70, fromId=61} 2017-03-29 20:49:57.688 INFO 44616 --- [cTaskExecutor-1] com.walking.techie.jobs.PartitionerJob : slaveReader end 61 70 2017-03-29 20:49:57.689 INFO 44616 --- [cTaskExecutor-2] com.walking.techie.jobs.PartitionerJob : slaveReader start 81 90 2017-03-29 20:49:57.689 INFO 44616 --- [cTaskExecutor-2] com.walking.techie.jobs.PartitionerJob : Parameter Value Thread9 {toId=90, fromId=81} 2017-03-29 20:49:57.689 INFO 44616 --- [cTaskExecutor-2] com.walking.techie.jobs.PartitionerJob : slaveReader end 81 90 2017-03-29 20:49:57.690 INFO 44616 --- [cTaskExecutor-7] com.walking.techie.jobs.PartitionerJob : slaveReader start 21 30 2017-03-29 20:49:57.690 INFO 44616 --- [cTaskExecutor-7] com.walking.techie.jobs.PartitionerJob : Parameter Value Thread3 {toId=30, fromId=21} 2017-03-29 20:49:57.690 INFO 44616 --- [cTaskExecutor-7] com.walking.techie.jobs.PartitionerJob : slaveReader end 21 30 2017-03-29 20:49:57.748 INFO 44616 --- [cTaskExecutor-8] com.walking.techie.jobs.PartitionerJob : ********called slave processor ********** Thread1 processing : 1 : santosh Thread1 processing : 2 : santosh Thread1 processing : 3 : santosh Thread1 processing : 4 : santosh Thread1 processing : 5 : santosh Thread1 processing : 6 : santosh 2017-03-29 20:49:57.787 INFO 44616 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step2] 2017-03-29 20:49:57.791 INFO 44616 --- [ main] com.walking.techie.tasklet.DummyTasklet : Dummy Tasklet called. 2017-03-29 20:49:57.801 INFO 44616 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=partitionJob]] completed with the following parameters: [{run.id=5}] and the following status: [COMPLETED]
Note : This code has been compiled and run on mac notebook and intellij IDEA.
Nice share. Quite informative
ReplyDeleteNice Tutorial. I am trying to run the your source code locally. Its working fine, But I don't see User processor is being called. Could you please guide me? Do I need to add any settings.
ReplyDeletePlease compare your source code with git hub code.
Deletehttps://github.com/walkingtechie/spring-batch-partition
If you clone the code from github or follow the above step then it should call the user processor.
Thanks for your reply !!!.....I have compared ...everything looks good for me. However will verify again. And one more thingI have a requirement that need to call third party application through web service.
ReplyDelete1. Read the data from DB
2. call webservice and then store response back to table
3. commit the transaction.
where should I write service invocation logic either processor or Writer? Could you guide me on this
install mysql developer and run below queries
DeleteCreate DATABASE spring_batch;
CREATE TABLE IF NOT EXISTS user (
id int(5) NOT NULL AUTO_INCREMENT,
username varchar(50) DEFAULT NULL,
password varchar(20) DEFAULT NULL,
age int(5) DEFAULT NULL,
PRIMARY KEY(id));
INSERT INTO spring_batch.user (id,username,password,age) VALUES('1','gaurav','root','25');
You should write the logic in processor to call the web service. Once you got the response from the service, that you can save in writer.
DeleteWhats the role of chunk in above example ?
ReplyDeleteChunk is refer as Chunk oriented processing. That is, instead of reading, processing and writing all the lines at once, it’ll read, process and write a fixed amount of records (chunk) at a time. You can use chunk oriented processing for a reader, a writer and a processor over data.
DeleteCan you pls post same example where we read data from one db table and write it to another table ( same or different db table) as here we are writing to csv instead i need it to db.
ReplyDelete