I am currently developing data loaders.Reading a file and writing to database. I am using partition handler to process multiple Comma Separated files in 30 threads. I want to scale and have throughput.Daily i receive 15000 files(each having 1 million records ) , how do i scale using spring batch.i want the job to complete this within a day.Do we have any open source grid computing , that can do this fairly, or is there any simple fine tuning steps.
The spring batch data loader runs stand alone. There is no web container involved. it runs on single solaris machine having 24 cpus. The data is written in to single database.default isolation and propagation is provided.The xml config is given below:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd">
<!-- IMPORT DB CONFIG -->
<import resource="classpath:bom/bom/bomloader/job/DataSourcePoolConfig.xml" />
<!-- USE ANNOTATIONS TO CONFIGURE SPRING BEANS -->
<context:component-scan base-package="bom.bom.bom" />
<!-- INJECT THE PROCESS PARAMS HASHMAP BEFORE CONTEXT IS INITIALISED -->
<bean id="holder" class="bom.bom.bom.loader.util.PlaceHolderBean" >
<property name="beanName" value="holder"/>
</bean>
<bean id="logger" class="bom.bom.bom.loader.util.PlaceHolderBean" >
<property name="beanName" value="logger"/>
</bean>
<bean id="dataMap" class="java.util.concurrent.ConcurrentHashMap" />
<!-- JOB REPOSITORY - WE USE DATABASE REPOSITORY -->
<!-- <bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean" >-->
<!-- <property name="transactionManager" ref="frdtransactionManager" />-->
<!-- <property name="dataSource" ref="frddataSource" />-->
<!-- <property name="databaseType" value="oracle" />-->
<!-- <property name="tablePrefix" value="batch_"/> -->
<!-- </bean>-->
<!-- JOB REPOSITORY - WE IN MEMORY REPOSITORY -->
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<property name="transactionManager" ref="frdtransactionManager" />
</bean>
<!-- <bean id="jobExplorer" class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean">-->
<!-- <property name="dataSource" ref="frddataSource" />-->
<!-- <property name="tablePrefix" value="batch_"/> -->
<!-- </bean>-->
<!-- LAUNCH JOBS FROM A REPOSITORY -->
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
<property name="taskExecutor">
<bean class="org.springframework.core.task.SyncTaskExecutor" />
</property>
</bean>
<!-- CONFIGURE SCHEDULING IN QUARTZ -->
<!-- <bean id="jobDetail" class="org.springframework.scheduling.quartz.JobDetailBean">-->
<!-- <property name="jobClass" value="bom.bom.bom.assurance.core.JobLauncherDetails" />-->
<!-- <property name="group" value="quartz-batch" />-->
<!-- <property name="jobDataAsMap">-->
<!-- <map>-->
<!-- <entry key="jobName" value="${jobname}"/>-->
<!-- <entry key="jobLocator" value-ref="jobRegistry"/>-->
<!-- <entry key="jobLauncher" value-ref="jobLauncher"/>-->
<!-- </map>-->
<!-- </property>-->
<!-- </bean>-->
<!-- RUN EVERY 2 HOURS -->
<!-- <bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean">-->
<!-- <property name="triggers">-->
<!-- <bean id="cronTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean">-->
<!-- <property name="jobDetail" ref="jobDetail" />-->
<!-- <property name="cronExpression" value="2/0 * * * * ?" />-->
<!-- </bean>-->
<!-- </property>-->
<!-- </bean>-->
<!-- -->
<!-- RUN STANDALONE -->
<bean id="jobRunner" class="bom.bom.bom.loader.core.DataLoaderJobRunner">
<constructor-arg value="${LOADER_NAME}" />
</bean>
<!-- Get all the files for the exchanges and feed as resource to the MultiResourcePartitioner -->
<bean id="fileresource" class="bom.bom.bom.loader.util.FiltersFoldersResourceFactory" p:dataMap-ref="dataMap">
<property name="filePath" value="${PARENT_PATH}" />
<property name="acceptedFolders" value="${EXCH_CODE}" />
<property name="logger" ref="logger" />
</bean>
<!-- The network Data Loading Configuration goes here -->
<job id="CDR_network _PARALLEL" xmlns="http://www.springframework.org/schema/batch" restartable="false" >
<step id="PREPARE_CLEAN" >
<flow parent="prepareCleanFlow" />
<next on="COMPLETED" to="LOAD_EXCHANGE_DATA" />
<fail on="FAILED" exit-code="Failed on cleaning error records."/>
</step>
<step id="LOAD_EXCHANGE_DATA" >
<tasklet ref="businessData" transaction-manager="ratransactionManager" />
<next on="COMPLETED" to="LOAD_CDR_FILES" />
<fail on="FAILED" exit-code="FAILED ON LOADING EXCHANGE INFORMATION FROM DB." />
</step>
<step id="LOAD_CDR_FILES" >
<tasklet ref="fileresource" transaction-manager="frdtransactionManager" />
<next on="COMPLETED" to="PROCESS_FILE_TO_STAGING_TABLE_PARALLEL" />
<fail on="FAILED" exit-code="FAILED ON LOADING CDR FILES." />
</step>
<step id="PROCESS_FILE_TO_STAGING_TABLE_PARALLEL" next="limitDecision" >
<partition step="filestep" partitioner="filepartitioner" >
<handler grid-size="100" task-executor="executorWithCallerRunsPolicy" />
</partition>
</step>
<decision id="limitDecision" decider="limitDecider">
<next on="COMPLETED" to="MOVE_RECS_STAGING_TO_MAIN_TABLE" />
<next on="CONTINUE" to="PROCESS_FILE_TO_STAGING_TABLE_PARALLEL" />
</decision>
<step id="MOVE_RECS_STAGING_TO_MAIN_TABLE" >
<tasklet ref="moveRecords" transaction-manager="ratransactionManager" >
<transaction-attributes isolation="SERIALIZABLE"/>
</tasklet>
<fail on="FAILED" exit-code="FAILED ON MOVING DATA TO THE MAIN TABLE." />
<next on="*" to="PREPARE_ARCHIVE"/>
</step>
<step id="PREPARE_ARCHIVE" >
<flow parent="prepareArchiveFlow" />
<fail on="FAILED" exit-code="FAILED ON Archiving files" />
<end on="*" />
</step>
</job>
<flow id="prepareCleanFlow" xmlns="http://www.springframework.org/schema/batch">
<step id="CLEAN_ERROR_RECORDS" next="archivefileExistsDecisionInFlow" >
<tasklet ref="houseKeeping" transaction-manager="ratransactionManager" />
</step>
<decision id="archivefileExistsDecisionInFlow" decider="archivefileExistsDecider">
<end on="NO_ARCHIVE_FILE" />
<next on="ARCHIVE_FILE_EXISTS" to="runprepareArchiveFlow" />
</decision>
<step id="runprepareArchiveFlow" >
<flow parent="prepareArchiveFlow" />
</step>
</flow>
<flow id="prepareArchiveFlow" xmlns="http://www.springframework.org/schema/batch" >
<step id="ARCHIVE_CDR_FILES" >
<tasklet ref="archiveFiles" transaction-manager="frdtransactionManager" />
</step>
</flow>
<bean id="archivefileExistsDecider" class="bom.bom.bom.loader.util.ArchiveFileExistsDecider" >
<property name="logger" ref="logger" />
<property name="frdjdbcTemplate" ref="frdjdbcTemplate" />
</bean>
<bean id="filepartitioner" class="org.springframework.batch.core.partition.support.MultiResourcePartitioner" scope="step" >
<property name="resources" value="#{dataMap[processFiles]}"/>
</bean>
<task:executor id="executorWithCallerRunsPolicy"
pool-size="90-95"
queue-capacity="6"
rejection-policy="CALLER_RUNS"/>
<!-- <bean id="dynamicJobParameters" class="bom.bom.bom.assurance.core.DynamicJobParameters" />-->
<bean id="houseKeeping" class="bom.bom.bom.loader.core.HousekeepingOperation">
<property name="logger" ref="logger" />
<property name="jdbcTemplate" ref="rajdbcTemplate" />
<property name="frdjdbcTemplate" ref="frdjdbcTemplate" />
</bean>
<bean id="businessData" class="bom.bom.bom.loader.core.BusinessValidatorData">
<property name="logger" ref="logger" />
<property name="jdbcTemplate" ref="NrajdbcTemplate" />
<property name="param" value="${EXCH_CODE}" />
<property name="sql" value="${LOOKUP_QUERY}" />
</bean>
<step id="filestep" xmlns="http://www.springframework.org/schema/batch">
<tasklet transaction-manager="ratransactionManager" allow-start-if-complete="true" >
<chunk writer="jdbcItenWriter" reader="fileItemReader" processor="itemProcessor" commit-interval="500" retry-limit="2">
<retryable-exception-classes>
<include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
</retryable-exception-classes>
</chunk>
<listeners>
<listener ref="customStepExecutionListener">
</listener>
</listeners>
</tasklet>
</step>
<bean id="moveRecords" class="bom.bom.bom.loader.core.MoveDataFromStaging">
<property name="logger" ref="logger" />
<property name="jdbcTemplate" ref="rajdbcTemplate" />
</bean>
<bean id="archiveFiles" class="bom.bom.bom.loader.core.ArchiveCDRFile" >
<property name="logger" ref="logger" />
<property name="jdbcTemplate" ref="frdjdbcTemplate" />
<property name="archiveFlag" value="${ARCHIVE_FILE}" />
<property name="archiveDir" value="${ARCHIVE_LOCATION}" />
</bean>
<bean id="limitDecider" class="bom.bom.bom.loader.util.LimitDecider" p:dataMap-ref="dataMap">
<property name="logger" ref="logger" />
</bean>
<!-- <bean id="multifileReader" class="org.springframework.batch.item.file.MultiResourceItemReader" scope="step" >-->
<!-- <property name="resources" value="#{stepExecutionContext[fileName]}" />-->
<!-- <property name="delegate" ref="fileItemReader" />-->
<!-- </bean>-->
<!-- READ EACH FILE PARALLELY -->
<bean id="fileItemReader" scope="step" autowire-candidate="false" parent="itemReaderParent">
<property name="resource" value="#{stepExecutionContext[fileName]}" />
<property name="saveState" value="false" />
</bean>
<!-- LISTEN AT THE END OF EACH FILE TO DO POST PROCESSING -->
<bean id="customStepExecutionListener" class="bom.bom.bom.loader.core.StagingStepExecutionListener" scope="step">
<property name="logger" ref="logger" />
<property name="frdjdbcTemplate" ref="frdjdbcTemplate" />
<property name="jdbcTemplate" ref="rajdbcTemplate" />
<property name="sql" value="${INSERT_IA_QUERY_COLUMNS}" />
</bean>
<!-- CONFIGURE THE ITEM PROCESSOR TO DO BUSINESS LOGIC ON EACH ITEM -->
<bean id="itemProcessor" class="bom.bom.bom.loader.core.StagingLogicProcessor" scope="step">
<property name="logger" ref="logger" />
<property name="params" ref="businessData" />
</bean>
<!-- CONFIGURE THE JDBC ITEM WRITER TO WRITE IN TO DB -->
<bean id="jdbcItenWriter" class="org.springframework.batch.item.database.JdbcBatchItemWriter" scope="step">
<property name="dataSource" ref="radataSource"/>
<property name="sql">
<value>
<![CDATA[
${SQL1A}
]]>
</value>
</property>
<property name="itemSqlParameterSourceProvider">
<bean class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider">
</bean>
</property>
</bean>
<!-- <bean id="itemWriter" class="bom.bom.bom.assurance.core.LoaderDBWriter" scope="step">-->
<!-- <property name="sQL" value="${loader.sql}" />-->
<!-- <property name="jdbcTemplate" ref="NrajdbcTemplate" />-->
<!-- </bean>-->
<!-- CONFIGURE THE FLAT FILE ITEM READER TO READ INDIVIDUAL BATCH -->
<bean id="itemReaderParent" class="org.springframework.batch.item.file.FlatFileItemReader" abstract="true">
<property name="strict" value="false"/>
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean class="org.springframework.batch.item.file.transform.FixedLengthTokenizer">
<property name="names" value="${COLUMNS}" />
<property name="columns" value="${RANGE}" />
</bean>
</property>
<property name="fieldSetMapper">
<bean class="bom.bom.bom.loader.util.DataLoaderMapper">
<property name="params" value="${BEANPROPERTIES}"/>
</bean>
</property>
</bean>
</property>
</bean>
</beans>
Tried:
i could see that the ThreadPoolExecutor hangs after 3 hours.The prstat in solaris says it is processing, but no processing in the log.
Tried with less chunk size 500 ,due memory foot print,no progress.
Since it inserts in to single database( 30 pooled connections).is there anythin i can do here.
Instances from visual vm
stacktrace of thread all are locked at connection level
Full thread dump Java HotSpot(TM) Server VM (11.3-b02 mixed mode):
"Attach Listener" daemon prio=3 tid=0x00bbf800 nid=0x26 waiting on condition [0x00000000..0x00000000]
java.lang.Thread.State: RUNNABLE
"executorWithCallerRunsPolicy-1" prio=3 tid=0x008a7000 nid=0x25 runnable [0xd5a7d000..0xd5a7fb70]
java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:129)
at oracle.net.ns.Packet.receive(Packet.java:240)
at oracle.net.ns.DataPacket.receive(DataPacket.java:92)
at oracle.net.ns.NetInputStream.getNextPacket(NetInputStream.java:172)
at oracle.net.ns.NetInputStream.read(NetInputStream.java:117)
at oracle.net.ns.NetInputStream.read(NetInputStream.java:92)
at oracle.net.ns.NetInputStream.read(NetInputStream.java:77)
at oracle.jdbc.driver.T4CMAREngine.unmarshalUB1(T4CMAREngine.java:1034)
at oracle.jdbc.driver.T4CMAREngine.unmarshalSB1(T4CMAREngine.java:1010)
at oracle.jdbc.driver.T4C8Oall.receive(T4C8Oall.java:588)
at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:194)
at oracle.jdbc.driver.T4CPreparedStatement.executeForRows(T4CPreparedStatement.java:953)
at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1222)
at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3387)
at oracle.jdbc.driver.OraclePreparedStatement.executeUpdate(OraclePreparedStatement.java:3468)
- locked <0xdbdafa30> (a oracle.jdbc.driver.T4CConnection)
at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeUpdate(OraclePreparedStatementWrapper.java:1350)
at org.springframework.jdbc.core.JdbcTemplate$2.doInPreparedStatement(JdbcTemplate.java:818)
at org.springframework.jdbc.core.JdbcTemplate$2.doInPreparedStatement(JdbcTemplate.java:1)
at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:587)
at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:812)
at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:868)
at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:876)
at
Multithreaded steps. By default, Spring Batch uses the same thread to execute a batch job from start to finish, meaning that everything runs sequentially. Spring Batch also allows multithreading at the step level. This makes it possible to process chunks using several threads.
A Step is an independent, specific phase of a batch Job , such that every Job is composed of one or more Step s. Similar to a Job , a Step has an individual StepExecution that represents a single attempt to execute a Step .
Spring Batch Parallel Processing is each chunk in its own thread by adding a task executor to the step. If there are a million records to process and each chunk is 1000 records, and the task executor exposes four threads, you can handle 4000 records in parallel instead of 1000 records.
I would suggest you lower the chunk size to 50.
500 seems to be too big : you wait too much while talking with the DB.
At the same time, lower the TaskExecutor's pool size or increase your DB pool size. You can choose which on by watching your DB host : if it's CPU and IO is not maxxed, increase your DB pool size to increase the DB load. If your DB CPU is already at it's maximum, lower the TaskExecutor's pool size. The objective is to have a fluid process.
I think the DB will be your main limitating element. So begin by adjusting the DB pool size according to the DB host capacities. When it's done, adjust your TaskExecutor's pool size according to the DB pool size (TE pool size = DB pool size * 1.5), plus the batch's host capacities (CPU, memory and IOs).
Splitting your incoming files on multiple hard drives may help too (if possible).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With