Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring Batch : Tasklet with multi threaded executor has very bad performances related to Throttling algorithm

Using Spring batch 2.2.1, I have configured a Spring Batch Job, I used this approach:

  • http://static.springsource.org/spring-batch/reference/html/scalability.html#multithreadedStep

Configuration is the following:

  • Tasklet uses ThreadPoolTaskExecutor limited to 15 threads

  • throttle-limit is equal to number of threads

  • Chunk is used with:

    • 1 synchronized adapter of JdbcCursorItemReader to allow it's use by many threads as per Spring Batch documentation recommandation

      You can synchronize the call to read() and as long as the processing and writing is the most expensive part of the chunk your step may still complete much faster than in a single threaded configuration.

    • saveState is false on JdbcCursorItemReader

    • A Custom ItemWriter based on JPA. Note that its processing of one item can vary in terms of processing time, it can take few millis to few seconds ( > 60s).

    • commit-interval set to 1 (I know it could be better but it's not the issue)

  • All jdbc pools are fine, regarding Spring Batch doc recommandation

Running the batch leads to very strange and bad results due to the following:

  • at some step, if the items take some time to process by a writer, nearly all threads in the thread pool end up doing nothing instead of processing, only the slow writer is working.

Looking at Spring Batch code, root cause seems to be in this package:

  • org/springframework/batch/repeat/support/

Is this way of working a feature or is it a limitation/bug ?

If it's a feature, what is the way by configuration to make all threads without being starved by long processing work without having to rewrite everything ?

Note that if all items take the same time, everything works fine and multi-threading is OK, but if one of the item processing takes much more time, then multi-threading is nearly useless for the time the slow process works.

Note I opened this issue:

  • https://jira.springsource.org/browse/BATCH-2081
like image 863
pmpm Avatar asked Aug 15 '13 22:08

pmpm


3 Answers

As Alex said, it seems this behaviour is a contract as per javadocs of :

Subclasses just need to provide a method that gets the next result * and one that waits for all the results to be returned from concurrent * processes or threads

Look at:

TaskExecutorRepeatTemplate#waitForResults

Another option for you would be to use Partitioning :

  • A TaskExecutorPartitionHandler that will execute items from Partitionned ItemReader, see below
  • A Partitioner implementation that gives the ranges to be processed by ItemReader, see ColumnRangePartitioner below
  • A CustomReader that will read data using what Partitioner will have filled, see myItemReader configuration below

Michael Minella explains this in Chapter 11 of his book Pro Spring Batch:

<batch:job id="batchWithPartition">
    <batch:step id="step1.master">
        <batch:partition  partitioner="myPartitioner" handler="partitionHandler"/>
    </batch:step>       
</batch:job>
<!-- This one will create Paritions of Number of lines/ Grid Size--> 
<bean id="myPartitioner" class="....ColumnRangePartitioner"/>
<!-- This one will handle every partition in a Thread -->
<bean id="partitionHandler" class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler">
    <property name="taskExecutor" ref="multiThreadedTaskExecutor"/>
    <property name="step" ref="step1" />
    <property name="gridSize" value="10" />
</bean>
<batch:step id="step1">
        <batch:tasklet transaction-manager="transactionManager">
            <batch:chunk reader="myItemReader"
                writer="manipulatableWriterForTests" commit-interval="1"
                skip-limit="30000">
                <batch:skippable-exception-classes>
                    <batch:include class="java.lang.Exception" />
                </batch:skippable-exception-classes>
            </batch:chunk>
        </batch:tasklet>
</batch:step>
 <!-- scope step is critical here-->
<bean id="myItemReader"    
                        class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step">
    <property name="dataSource" ref="dataSource"/>
    <property name="sql">
        <value>
            <![CDATA[
                select * from customers where id >= ? and id <=  ?
            ]]>
        </value>
    </property>
    <property name="preparedStatementSetter">
        <bean class="org.springframework.batch.core.resource.ListPreparedStatementSetter">
            <property name="parameters">
                <list>
 <!-- minValue and maxValue are filled in by Partitioner for each Partition in an ExecutionContext-->
                    <value>{stepExecutionContext[minValue]}</value>
                    <value>#{stepExecutionContext[maxValue]}</value>
                </list>
            </property>
        </bean>
    </property>
    <property name="rowMapper" ref="customerRowMapper"/>
</bean>

Partitioner.java:

 package ...;
  import java.util.HashMap;  
 import java.util.Map;
 import org.springframework.batch.core.partition.support.Partitioner;
 import org.springframework.batch.item.ExecutionContext;
 public class ColumnRangePartitioner  implements Partitioner {
 private String column;
 private String table;
 public Map<String, ExecutionContext> partition(int gridSize) {
    int min =  queryForInt("SELECT MIN(" + column + ") from " + table);
    int max = queryForInt("SELECT MAX(" + column + ") from " + table);
    int targetSize = (max - min) / gridSize;
    System.out.println("Our partition size will be " + targetSize);
    System.out.println("We will have " + gridSize + " partitions");
    Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
    int number = 0;
    int start = min;
    int end = start + targetSize - 1;
    while (start <= max) {
        ExecutionContext value = new ExecutionContext();
        result.put("partition" + number, value);
        if (end >= max) {
            end = max;
        }
        value.putInt("minValue", start);
        value.putInt("maxValue", end);
        System.out.println("minValue = " + start);
        System.out.println("maxValue = " + end);
        start += targetSize;
        end += targetSize;
        number++;
    }
    System.out.println("We are returning " + result.size() + " partitions");
    return result;
}
public void setColumn(String column) {
    this.column = column;
}
public void setTable(String table) {
    this.table = table;
}
}
like image 112
UBIK LOAD PACK Avatar answered Oct 23 '22 01:10

UBIK LOAD PACK


Here's what I think is going on:

  • As you said, your ThreadPoolTaskExecutor is limited to 15 threads
  • The framework's "chunk" is causing each item in the JdbcCursorItemReader (up to the thread limit) to be executed in a different thread
  • But the Spring Batch framework is also waiting for each of the threads (i.e., all 15) to complete their individual read/process/write flow before moving onto the next chunk, given your commit interval of 1. On occasion, this is causing 14 threads to wait almost 60 seconds on a sibling thread that is taking forever to complete.

In other words, for this multi-threaded approach in Spring Batch to be helpful, each thread needs to process in about the same amount of time. Given your scenario where there is a huge disparity between the processing time of certain items, you are experiencing a limitation where many of your threads are complete and waiting on a long-running sibling thread to be able to move onto the next chunk of processing.

My suggestion:

  • Generally, I'd say that increasing your commit interval should help somewhat, since it should allow more than one cursor item to be processed in a single thread in between commits even if one of the threads is stuck on a long-running write. However, if you're unlucky, multiple long transactions could occur in the same thread and make matters worse (e.g., 120 sec. between commits in a single thread for a commit interval of 2).
  • Specifically, I'd suggest increasing your thread pool size to a big number, even exceeding your max database connections by 2x or 3x. What should happen is that even though some of your threads will block trying to acquire a connection (because of the large thread pool size), you'll actually see an increase in throughput as your long-running threads are no longer stopping other threads from taking new items from the cursor and continuing your batch job's work in the meantime (at the beginning of a chunk, your number of pending threads will greatly exceed your number of available database connections. So the OS scheduler will churn a bit as it activates threads that are blocked on acquiring a database connection and has to deactivate the thread. However, since most of your threads will complete their work and release their database connection relatively quickly, you should see that overall your throughput is improved as many threads continue acquiring database connections, doing work, releasing database connections, and allowing further threads to do the same even while your long-running threads are doing their thing).
like image 21
Alex Avatar answered Oct 23 '22 02:10

Alex


In my case, if i don't set the throttle-limit, then only 4 threads come in read() method of ItemReader which is also the default number of threads, if not specified in tasklet tag as per Spring Batch documentation.

If i specify more threads e.g 10 or 20 or 100, then only 8 threads come in read() method of ItemReader

like image 24
Harsh Gupta Avatar answered Oct 23 '22 02:10

Harsh Gupta