Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

in Spring Batch, can multiple JdbcBatchItemWriters be configured to write in parallel?

In my spring batch job, my item processor splits the object, which the item reader reads, into seven lists of variable lengths. These lists have to be written to seven tables in the DB and any errors (like db rejecting records for any reason) must cause the transaction to rollback on all seven tables.

Currently, I create a wrapped object with these seven lists which are passed to the custom item writer. The writer takes all these items, creates its own seven lists so that it only has seven batched writes (using DAOs based on JdbcTemplate) for a batch of the wrapped objects returned by the item processor.

My writer calls the insert function for each of these tables sequentially which I would like to speed up. I was wondering if I could write the lists, to their respective tables, in parallel so that the overall execution time is the time of the longest write. One requirement I cannot compromise is that this has to be in a single transaction which needs to be rolled back should any of the writers have any exceptions.

like image 427
Wolvington Avatar asked Feb 22 '13 17:02

Wolvington


1 Answers

here's a simple solution utilizing a TaskExecutor and extending on the org.springframework.batch.item.support.CompositeItemWriter.

package de.incompleteco.spring.batch.item.support;

import java.util.List;

import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.core.task.TaskExecutor;
import org.springframework.util.Assert;

import de.incompleteco.spring.domain.SimpleEntity;

public class ParallelCompositeItemWriter extends CompositeItemWriter<SimpleEntity> {

    private List<ItemWriter<? super SimpleEntity>> delegates;

    private TaskExecutor taskExecutor;

    @Override
    public void write(final List<? extends SimpleEntity> item) throws Exception {
        for (final ItemWriter<? super SimpleEntity> writer : delegates) {
            taskExecutor.execute(new Runnable()  {
                @Override
                public void run() {
                    try {
                        writer.write(item);
                    } catch (Throwable t) {
                        rethrow(t);
                    }   
                }

                private void rethrow(Throwable t) {
                    if (t instanceof RuntimeException) {
                        throw (RuntimeException) t;
                    }
                    else if (t instanceof Error) {
                        throw (Error) t;
                    }
                    throw new IllegalStateException(t);
                }       
            });
        }//end for
    }


    public void setTaskExecutor(TaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    @Override
    public void setDelegates(List<ItemWriter<? super SimpleEntity>> delegates) {
        this.delegates = delegates;
        super.setDelegates(delegates);
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        super.afterPropertiesSet();
        Assert.notNull(taskExecutor,"Task executor needs to be set");
    }



}

an example configuration would look something like this;

<batch:job id="simpleJob">
    <batch:step id="simpleJob.step1">
        <batch:tasklet>
            <batch:chunk reader="reader" writer="writer" commit-interval="10"/>
        </batch:tasklet>
    </batch:step>
</batch:job>

<bean id="reader" class="org.springframework.batch.item.support.IteratorItemReader">
    <constructor-arg ref="itemList"/>
</bean>

<bean id="writer" class="de.incompleteco.spring.batch.item.support.ParallelCompositeItemWriter">
    <property name="delegates" ref="writerDelegates"/>
    <property name="taskExecutor" ref="writerTaskExecutor"/>
</bean>

<util:list id="writerDelegates">
    <bean class="org.springframework.batch.item.database.JdbcBatchItemWriter">
        <property name="dataSource" ref="dataSource1"/>
        <property name="sql" value="insert into test_table (idcol,stuff) values (:idCol,:stuff)"/>
        <property name="itemSqlParameterSourceProvider">
            <bean class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider"/>
        </property>
    </bean>
    <bean class="org.springframework.batch.item.database.JdbcBatchItemWriter">
        <property name="dataSource" ref="dataSource2"/>
        <property name="sql" value="insert into test_table (idcol,stuff) values (:idCol,:stuff)"/>
        <property name="itemSqlParameterSourceProvider">
            <bean class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider"/>
        </property>
    </bean>     
</util:list>

<util:list id="itemList">
    <bean class="de.incompleteco.spring.domain.SimpleEntity">
        <constructor-arg value="stuff1"/>
    </bean>
    <bean class="de.incompleteco.spring.domain.SimpleEntity">
        <constructor-arg value="stuff2"/>
    </bean>     
    <bean class="de.incompleteco.spring.domain.SimpleEntity">
        <constructor-arg value="stuff3"/>
    </bean>     
</util:list>

<task:executor id="writerTaskExecutor" pool-size="3"/>


<bean id="dataSource1" class="bitronix.tm.resource.jdbc.PoolingDataSource" init-method="init" destroy-method="close">
    <property name="className" value="org.h2.jdbcx.JdbcDataSource" />
    <property name="uniqueName" value="#{T(System).currentTimeMillis()}" />
    <property name="allowLocalTransactions" value="true"/>
    <property name="maxPoolSize" value="2" />
    <property name="driverProperties">
        <props>
            <prop key="URL">jdbc:h2:mem:a;DB_CLOSE_DELAY=-1</prop>
        </props>
    </property>
</bean> 

<bean id="dataSource2" class="bitronix.tm.resource.jdbc.PoolingDataSource" init-method="init" destroy-method="close">
    <property name="className" value="org.h2.jdbcx.JdbcDataSource" />
    <property name="uniqueName" value="#{T(System).currentTimeMillis()}" />
    <property name="allowLocalTransactions" value="true"/>
    <property name="maxPoolSize" value="2" />
    <property name="driverProperties">
        <props>
            <prop key="URL">jdbc:h2:mem:b;DB_CLOSE_DELAY=-1</prop>
        </props>
    </property>
</bean>     

<jdbc:initialize-database  data-source="dataSource1">
    <jdbc:script location="classpath:/META-INF/sql/schema-h2.sql"/>
</jdbc:initialize-database>

<jdbc:initialize-database  data-source="dataSource2">
    <jdbc:script location="classpath:/META-INF/sql/schema-h2.sql"/>
</jdbc:initialize-database>
<!-- XA transaction -->

<bean id="btmConfig" factory-method="getConfiguration" class="bitronix.tm.TransactionManagerServices"/>

<bean id="BitronixTransactionManager" factory-method="getTransactionManager"
    class="bitronix.tm.TransactionManagerServices" depends-on="btmConfig" destroy-method="shutdown" />

<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
    <property name="transactionManager" ref="BitronixTransactionManager" />
    <property name="userTransaction" ref="BitronixTransactionManager" />
</bean>

this example uses the following;

  • Bitronix JTA to support transactions across multiple databases
  • a very simple model of a simple entity into a simple jdbc record

(the stuff in the database is very crude and just an example)

like image 93
incomplete-co.de Avatar answered Nov 03 '22 09:11

incomplete-co.de