Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring Batch Multi Threading - How to make each thread read unique records?

This question has been asked a lot of times in many forums. But I don't see an answer that fits me. I am trying to implement Multi-threaded Step in my spring batch implementation.

  1. Have a staging table with 100k records

  2. Want to process it in 10 threads of commit interval 300 per thread- so 3000 records at any point of time.

  3. I defined a task executor and referred it inside the step i wanted to multi thread

  4. My idea is that first I would get the thread pool size (10) and update the thread_id column with a velue(can be 1-10) to each of the 100k records. In this case of 10 threads and 100k records so 10k records will be assigned one id - I am trying to implement a stagingsteplistener to do this.

  5. wrote a reader for this staging table. task executor will create 10 readers and each reader must read 300 different records and process them - Now how do I pass a common id between the step listener and reader so that each thread will have its own set of records to process.

As of now I have only one JVM. So I am thinking of doing this in Multi Threaded step itself rather than thinking about partition based approach.

Please help......

I referred pro spring batch book and created a staging step listener which is accepting a run id from the job configuration xml using job parameters as below

<beans:bean id="stagingStepListener"
class="com.apress.springbatch.statement.listener.StagingStepListener" scope="step">
<beans:property name="dataSource" ref="dataSource"/>
<beans:property name="tableName" value="transaction"/>
<beans:property name="whereClause"
value="where jobId is null and processed is null"/>
<beans:property name="jobId" value="#{jobParameters[run.id]}"/>
</beans:bean>

What i dont find is this? From where is this "run.id" coming from. I dont see that in any place in the book. I copied the same implementation in my spring batch and when i run it I see exception saying that run.id is not identifiable. Please help me about how to do this?

like image 444
juniorbansal Avatar asked Jan 31 '12 22:01

juniorbansal


1 Answers


  • What i dont find is this? From where is this "run.id" coming from

JobParameters

This is just a parameter that you pass to jobParameters. Usually a different run.id (a conventional name) for each instance is used because the framework has no way of knowing what changes to the JobParameters make it the "next" job instance.

You can pass this "run.id" to the jobParameters as:

new JobParametersBuilder().addLong("run.id", 1L).toJobParameters()

take a look at the JobParametersIncrementer's documentation for more details.


  • how do I pass a common id between the step listener and reader so that each thread will have its own set of records to process

Don't

This is quite a dangerous route, since many participants in a Step (e.g. readers and writers) are stateful, and if the state is not segregated by thread, then those components are not usable in a multi-threaded Step. In particular most of the off-the-shelf readers and writers from Spring Batch are not designed for multi-threaded use.

Partitioning

I would recommend to use Partitioning. It is a lot simpler than it seems, and you can still utilize multiple threads for it. Take a look at the sample batch jobs that use partitioning, which comes from "Spring Batch samples" is there to:

show multi-threaded step execution using the PartitionHandler SPI. The example uses a TaskExecutorPartitionHandler to spread the work of reading some files acrosss multiple threads, with one Step execution per thread. The key components are the PartitionStep and the MultiResourcePartitioner which is responsible for dividing up the work. Notice that the readers and writers in the Step that is being partitioned are step-scoped, so that their state does not get shared across threads of execution.

like image 185
tolitius Avatar answered Oct 25 '22 03:10

tolitius