Using Spring batch 2.2.1, I have configured a Spring Batch Job, I used this approach:
Configuration is the following:
Tasklet uses ThreadPoolTaskExecutor limited to 15 threads
throttle-limit is equal to number of threads
Chunk is used with:
1 synchronized adapter of JdbcCursorItemReader to allow it's use by many threads as per Spring Batch documentation recommandation
You can synchronize the call to read() and as long as the processing and writing is the most expensive part of the chunk your step may still complete much faster than in a single threaded configuration.
saveState is false on JdbcCursorItemReader
A Custom ItemWriter based on JPA. Note that its processing of one item can vary in terms of processing time, it can take few millis to few seconds ( > 60s).
commit-interval set to 1 (I know it could be better but it's not the issue)
All jdbc pools are fine, regarding Spring Batch doc recommandation
Running the batch leads to very strange and bad results due to the following:
Looking at Spring Batch code, root cause seems to be in this package:
Is this way of working a feature or is it a limitation/bug ?
If it's a feature, what is the way by configuration to make all threads without being starved by long processing work without having to rewrite everything ?
Note that if all items take the same time, everything works fine and multi-threading is OK, but if one of the item processing takes much more time, then multi-threading is nearly useless for the time the slow process works.
Note I opened this issue:
As Alex said, it seems this behaviour is a contract as per javadocs of :
Subclasses just need to provide a method that gets the next result * and one that waits for all the results to be returned from concurrent * processes or threads
Look at:
TaskExecutorRepeatTemplate#waitForResults
Another option for you would be to use Partitioning :
Michael Minella explains this in Chapter 11 of his book Pro Spring Batch:
<batch:job id="batchWithPartition"> <batch:step id="step1.master"> <batch:partition partitioner="myPartitioner" handler="partitionHandler"/> </batch:step> </batch:job> <!-- This one will create Paritions of Number of lines/ Grid Size--> <bean id="myPartitioner" class="....ColumnRangePartitioner"/> <!-- This one will handle every partition in a Thread --> <bean id="partitionHandler" class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler"> <property name="taskExecutor" ref="multiThreadedTaskExecutor"/> <property name="step" ref="step1" /> <property name="gridSize" value="10" /> </bean> <batch:step id="step1"> <batch:tasklet transaction-manager="transactionManager"> <batch:chunk reader="myItemReader" writer="manipulatableWriterForTests" commit-interval="1" skip-limit="30000"> <batch:skippable-exception-classes> <batch:include class="java.lang.Exception" /> </batch:skippable-exception-classes> </batch:chunk> </batch:tasklet> </batch:step> <!-- scope step is critical here--> <bean id="myItemReader" class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step"> <property name="dataSource" ref="dataSource"/> <property name="sql"> <value> <![CDATA[ select * from customers where id >= ? and id <= ? ]]> </value> </property> <property name="preparedStatementSetter"> <bean class="org.springframework.batch.core.resource.ListPreparedStatementSetter"> <property name="parameters"> <list> <!-- minValue and maxValue are filled in by Partitioner for each Partition in an ExecutionContext--> <value>{stepExecutionContext[minValue]}</value> <value>#{stepExecutionContext[maxValue]}</value> </list> </property> </bean> </property> <property name="rowMapper" ref="customerRowMapper"/> </bean>
Partitioner.java:
package ...; import java.util.HashMap; import java.util.Map; import org.springframework.batch.core.partition.support.Partitioner; import org.springframework.batch.item.ExecutionContext; public class ColumnRangePartitioner implements Partitioner { private String column; private String table; public Map<String, ExecutionContext> partition(int gridSize) { int min = queryForInt("SELECT MIN(" + column + ") from " + table); int max = queryForInt("SELECT MAX(" + column + ") from " + table); int targetSize = (max - min) / gridSize; System.out.println("Our partition size will be " + targetSize); System.out.println("We will have " + gridSize + " partitions"); Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>(); int number = 0; int start = min; int end = start + targetSize - 1; while (start <= max) { ExecutionContext value = new ExecutionContext(); result.put("partition" + number, value); if (end >= max) { end = max; } value.putInt("minValue", start); value.putInt("maxValue", end); System.out.println("minValue = " + start); System.out.println("maxValue = " + end); start += targetSize; end += targetSize; number++; } System.out.println("We are returning " + result.size() + " partitions"); return result; } public void setColumn(String column) { this.column = column; } public void setTable(String table) { this.table = table; } }
Here's what I think is going on:
In other words, for this multi-threaded approach in Spring Batch to be helpful, each thread needs to process in about the same amount of time. Given your scenario where there is a huge disparity between the processing time of certain items, you are experiencing a limitation where many of your threads are complete and waiting on a long-running sibling thread to be able to move onto the next chunk of processing.
My suggestion:
In my case, if i don't set the throttle-limit, then only 4 threads come in read() method of ItemReader which is also the default number of threads, if not specified in tasklet tag as per Spring Batch documentation.
If i specify more threads e.g 10 or 20 or 100, then only 8 threads come in read() method of ItemReader
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With