The working configuration for the step in question is the following:
I wanted to analyze the performance of the Jdbc Cursor Item Reader in "myStep", however after the first commit, the second chunk's first read would fail with java.sql.SQLException: Result set already closed.
I suspected it might be JTA / XA driver closing the cursor for some reason, so I gave "myStep" a simple datasource transaction manager (on the datasource the reader was using), and the step was able to complete successfully. This isn't a solution, since this breaks transactionally integrity of the step.
Should I be able to use a cursor reader inside of a JTA managed step (using the environment described below)? If so, what might be configured incorrectly on my end?
Environment
<bean id="myTransactionManager"
class="org.springframework.transaction.jta.JtaTransactionManager"/>
Config
<bean id="myTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
<bean id="myDataSource" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiName" value="jdbc/myDataSource"/>
<property name="proxyInterface" value="javax.sql.DataSource"/>
</bean>
<batch:step id="myStep" job-repository="myJobRepositoryFactory">
<batch:tasklet transaction-manager="myTransactionManager">
<batch:chunk
reader="myReader"
processor="myProcessor"
writer="myWriter"
commit-interval="100"
processor-transactional="false"/>
<batch:listeners>
<batch:listener ref="myListener"/>
</batch:listeners>
</batch:tasklet>
</batch:step>
<bean id="myReader" class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step">
<property name="dataSource" ref="myDataSource"/>
<property name="sql" value="SELECT * FROM myHugeTable ORDER BY myColumn DESC"/>
<property name="rowMapper">
<bean class="myRowMapper"/>
</property>
</bean>
Caught in the act
Below is the call stack of the result set being closed before the next chunk's read. Notice XA Connection closing all statements, which causes JDBC to close all results sets.
java.lang.Thread.State: RUNNABLE
at weblogic.jdbc.wrapper.ResultSet.internalClose(ResultSet.java:178)
at weblogic.jdbc.wrapper.Statement.closeAllResultSets(Statement.java:286)
at weblogic.jdbc.wrapper.Statement.internalClose(Statement.java:395)
at weblogic.jdbc.wrapper.Statement.internalClose(Statement.java:367)
at weblogic.jdbc.wrapper.XAConnection.closeAllStatements(XAConnection.java:393)
at weblogic.jdbc.wrapper.XAConnection.cleanup(XAConnection.java:406)
at weblogic.jdbc.wrapper.XAConnection.releaseToPool(XAConnection.java:432)
at weblogic.jdbc.jta.DataSource.removeTxAssoc(DataSource.java:1907)
at weblogic.jdbc.jta.DataSource.prepare(DataSource.java:1090)
at weblogic.transaction.internal.XAServerResourceInfo.prepare(XAServerResourceInfo.java:1408)
at weblogic.transaction.internal.XAServerResourceInfo.prepare(XAServerResourceInfo.java:522)
at weblogic.transaction.internal.ServerSCInfo.startPrepare(ServerSCInfo.java:411)
at weblogic.transaction.internal.ServerTransactionImpl.localPrepare(ServerTransactionImpl.java:2709)
at weblogic.transaction.internal.ServerTransactionImpl.globalPrepare(ServerTransactionImpl.java:2340)
at weblogic.transaction.internal.ServerTransactionImpl.internalCommit(ServerTransactionImpl.java:300)
at weblogic.transaction.internal.ServerTransactionImpl.commit(ServerTransactionImpl.java:260)
at org.glassfish.transaction.TransactionManagerImplCommon.commit(TransactionManagerImplCommon.java:571)
at org.springframework.transaction.jta.JtaTransactionManager.doCommit(JtaTransactionManager.java:1021)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:761)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:730)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:150)
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:271)
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:77)
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:368)
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:144)
at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:257)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:198)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:64)
at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:67)
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:165)
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144)
at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:134)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:304)
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:135)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
You should be able to use the cursor reader inside a JTA managed step. We are doing exactly this in the project I'm working on. We use Atomikos as XA TM.
Here is our XA/JTA configuration, that we use. Perhaps it is of some use for you:
@Bean(initMethod = "init", destroyMethod = "shutdownForce")
public UserTransactionService userTransactionService() {
return new UserTransactionServiceImp(userTransactionServiceProperties());
}
@Bean(initMethod = "init", destroyMethod = "close")
@DependsOn("userTransactionService")
public UserTransactionManager atomikosTransactionManager() {
UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(true);
userTransactionManager.setStartupTransactionService(false);
return userTransactionManager;
}
@Bean
@DependsOn("userTransactionService")
public UserTransaction atomikosUserTransaction() throws SystemException {
return new UserTransactionImp();
}
@Bean
@DependsOn("userTransactionService")
public JtaTransactionManager transactionManager() throws SystemException {
JtaTransactionManager jtaTransactionManager = new JtaTransactionManager();
jtaTransactionManager.setTransactionManager(atomikosTransactionManager());
jtaTransactionManager.setUserTransaction(atomikosUserTransaction());
jtaTransactionManager.setAllowCustomIsolationLevels(true);
return jtaTransactionManager;
}
All our datasources are instantiated as org.springframework.boot.jta.atomikos.AtomikosDataSourceBean. E.g., a Ora-datasource is instantiated like this:
AtomikosDataSourceBean oraXaDs = new AtomikosDataSourceBean();
oraXaDs.setXaDataSourceClassName(oraDsProp.getDatasourceClass());
oraXaDs.setUniqueResourceName(oraDsProp.getInstancename());
oraXaDs.setMinPoolSize(oraDsProp.getPoolMinSize());
oraXaDs.setMaxPoolSize(oraDsProp.getPoolMaxSize());
oraXaDs.setTestQuery(oraDsProp.getValidConnectionSQL());
Properties oraXaDsProps = oraXaDs.getXaProperties();
oraXaDsProps.setProperty("user", oraDsProp.getUser());
oraXaDsProps.setProperty("password", oraDsProp.getPassword());
oraXaDsProps.setProperty("URL", oraDsProp.getUrl());
My two cents on this issue:
First some insight :
Reading from a database cursor means opening a connection, firing one SQL statement against it and constantly reading rows during the whole batch job. That makes sense, because often input data of a job can be characterized by one SQL statement, but executing it and reading all the data from the ResultSet upfront is of course no solution. We just have one problem here with reading constantly: committing the transaction would close the connection. So how do we keep it open? Simple solution: it doesn’t take part in the transaction. Spring Batch’s JdbcCursorItemReader uses a separate connection for opening the cursor, thereby bypassing the transaction managed by the transaction manager. In an application server environment we have to do a little bit more to make it work. Normally we get connections from a DataSource managed by the application server, and all of those connections take part in transactions by default. We need to set up a separate DataSource which does not take part in transactions, and only inject it into our cursor based readers. Injecting them anywhere else could cause a lot of damage regarding transaction safety.
Your problem is in your step basically :(from whatever i can conclude without viewing your datasource xml file :) )
Step, Spring Batch Job Repository, and business repositories (using various datasources) all use a JTA transaction manager.
The JTA transaction manager spring provides should be used in a way that weblogic handles your JTA transactions instead.What you need is only configure the datasource (which should be also under the container) to use XA-aware drivers, and in your app, use the corresponding transaction manager, and lookup the XA-aware datasources by JNDI
However if you cannot rely on the container (e.g. you are writing a standalone app, and etc), you will need to find an transaction manager that is capable on that. Atomikos is one the the most famous free JTA/XA library.
Having said that,after you have configured with either the JNDI way or Atomikos way,here are the configuration which should be kept in mind while using multiple datasources:
<batch:tasklet>
<batch:transaction-attributes isolation="READ_COMMITTED" propagation="REQUIRES_NEW" timeout="200"/>
<batch:chunk reader="myItemReader" writer="myItemWriter" commit-interval="20"/>
</batch:tasklet>
Hope this clears out some air on this issue.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With