Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Processing huge data with spring batch partitioning

I am implementing spring batch job for processing millions of records in a DB table using partition approach as follows -

  1. Fetch a unique partitioning codes from table in a partitioner and set the same in execution context.

  2. Create a chunk step with reader,processor and writer to process records based on particular partition code.

Is this approach is proper or is there any better approach for situation like this? As some partition codes can have more number of records than others,so those with more records might take more time to process than the ones with less records.

Is it possible to create partition/thread to process like thread1 process 1-1000,thread2 process 1001-2000 etc ?

How do I control number of threads getting created as partition codes can be around 100, I would like to create only 20 thread and process in 5 iteration?

What happens if one partition fails, will all processing stop and reverted back?

Following are configurations -

 <bean id="MyPartitioner" class="com.MyPartitioner" />
 <bean id="itemProcessor" class="com.MyProcessor" scope="step" />
 <bean id="itemReader" class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step" >
  <property name="dataSource" ref="dataSource"/>
  <property name="sql" value="select * from mytable WHERE code = '#{stepExecutionContext[code]}' "/>
  <property name="rowMapper">
      <bean class="com.MyRowMapper" scope="step"/>
  </property>
</bean>
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >
    <property name="corePoolSize" value="20"/>
    <property name="maxPoolSize" value="20"/>
    <property name="allowCoreThreadTimeOut" value="true"/>
</bean>

<batch:step id="Step1" xmlns="http://www.springframework.org/schema/batch">
    <batch:tasklet transaction-manager="transactionManager">
        <batch:chunk reader="itemReader"  processor="itemProcessor" writer="itemWriter" commit-interval="200"/>
    </batch:tasklet>
</batch:step>
<batch:job id="myjob">
    <batch:step id="mystep">
        <batch:partition step="Step1" partitioner="MyPartitioner">
            <batch:handler grid-size="20" task-executor="taskExecutor"/>
        </batch:partition>
    </batch:step>
</batch:job>

Partitioner -

public class MyPartitioner implements Partitioner{
@Override
public Map<String, ExecutionContext> partition(int gridSize)
{
Map<String, ExecutionContext> partitionMap = new HashMap<String, ExecutionContext>();
List<String> codes = getCodes();

for (String code : codes)
{
    ExecutionContext context = new ExecutionContext();
    context.put("code", code);
    partitionMap.put(code, context);
}
return partitionMap;}}

Thanks

like image 798
springenthusiast Avatar asked Mar 24 '15 06:03

springenthusiast


People also ask

How do you process millions of records in Spring Batch?

Spring Batch Parallel Processing is each chunk in its own thread by adding a task executor to the step. If there are a million records to process and each chunk is 1000 records, and the task executor exposes four threads, you can handle 4000 records in parallel instead of 1000 records.

How chunks help in processing data in Spring Batch?

Using chunk processing, Spring Batch collects items one at a time from the item reader into a configurable-sized chunk. Spring Batch then sends the chunk to the item writer and goes back to using the item reader to create another chunk, and so on, until the input is exhausted.

What is partitioning in Spring Batch?

Partitioning a Step Spring Batch with partitioning provides us the facility to divide the execution of a Step: Partitioning Overview. The above picture shows an implementation of a Job with a partitioned Step. There's a Step called “Master”, whose execution is divided into some “Slave” steps.


1 Answers

I would say it is right approach, I do not see why you need to have one thread per 1000 items, if you partition per unique partitioning codes and have chunk of 1000 items you will have transactions on 1000 items per thread which is IMO ok.

  1. In addition to saving unique partitioning codes you can count how many you have of each partition code and partition even more, by creating new subcontext for every 1000 of same partition code (that way for partition code that has i.e. 2200 records you will invoke 3 threads with context params: 1=> partition_key=key1, skip=0, count=1000, 2=>partition_key=key1, skip=1000, count=1000 and 3=>partition_key=key1, skip=2000, count=1000) if that is what you want but I would still go without it

  2. Number of threads is controlled with ThreadPoolTaskExecutor which is passed to partition step when you create it. You have method setCorePoolSize() which you can set on 20 and you will get at most 20 threads. Next fine grain configuration is grid-size which tells how many partitions will be created out of full partition map. Here is explanation of grid size. So partitioning is about dividing the work. After that, your threading configuration will define the concurrency of the actual processing.

  3. If one partition fails whole partitioned step fails with information which partition failed. Success partitions are done and would not be invoked again and when job restarts it will pick up where it left off by redoing failed and non processed partitions.

Hope I picked up all questions you had since there were many.

Example of case 1 - maybe there are mistakes but just to get idea:

public class MyPartitioner implements Partitioner{
@Override
public Map<String, ExecutionContext> partition(int gridSize)
{
    Map<String, ExecutionContext> partitionMap = new HashMap<String, ExecutionContext>();
    Map<String, int> codesWithCounts = getCodesWithCounts();

    for (Entry<String, int> codeWithCount : codesWithCounts.entrySet())
    {
        for (int i = 0; i < codeWithCount.getValue(); i + 1000){
            ExecutionContext context = new ExecutionContext();
            context.put("code", code);
            context.put("skip", i);
            context.put("count", 1000);
            partitionMap.put(code, context);
        }
    }
    return partitionMap;
}

Adn than you page by 1000 and you get from context how many you should skip which will in example of 2200 be: 0, 1000, 2000.

like image 192
Nenad Bozic Avatar answered Oct 19 '22 14:10

Nenad Bozic