In Spring Batch 3.0 I'm trying to use the new Job Scope functionality for beans in both partitioned and multi-threaded steps (configured with an task:executor bean), and in both cases I'm getting the exception
Caused by: java.lang.IllegalStateException: No context holder available for job scope
at org.springframework.batch.core.scope.JobScope.getContext(JobScope.java:153)
at org.springframework.batch.core.scope.JobScope.get(JobScope.java:92)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:338)
but if i make the beans step scope it works OK.
I noticed the comment on JobSynchronizationManager which says
N.B. it is the responsibility of every {@link Job} implementation to ensure that a {@link JobContext} is available on every thread that might be involved in a job execution, including worker threads from a pool.
so I'm wondering if I need to do something to set this up or if its a bug in the the job scope implementation that it doesn't set up the worker threads correctly?
StepSynchronizationManager has a similar comment - but in that case something is obviously setting up the threads correctly within the step.
Sample code to reproduce issue:
TestItemReader
package test;
import java.util.ArrayList;
import java.util.List;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.beans.factory.InitializingBean;
public class TestItemReader implements ItemReader<Integer>, InitializingBean {
private List<Integer> items;
@Override
public synchronized Integer read() throws Exception, UnexpectedInputException,
ParseException, NonTransientResourceException {
if (items.size() > 0) {
return items.remove(0);
}
return null;
}
@Override
public void afterPropertiesSet() throws Exception {
System.out.println("Initialising reader");
items = new ArrayList<Integer>();
for (int i=0;i<100;i++) items.add(i);
}
}
TestItemWriter
package test;
import java.util.List;
import org.springframework.batch.item.ItemWriter;
public class TestItemWriter implements ItemWriter<Integer> {
@Override
public void write(List<? extends Integer> items) throws Exception {
for (int i : items) {
System.out.println(Thread.currentThread().getName() + " Writing " + i);
}
}
}
test-job-context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:jdbc="http://www.springframework.org/schema/jdbc"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<job id="job" restartable="true" xmlns="http://www.springframework.org/schema/batch">
<step id="index">
<tasklet task-executor="executor">
<chunk reader="itemReader" writer="itemWriter" commit-interval="5"/>
</tasklet>
</step>
</job>
<bean id="itemReader" class="test.TestItemReader" scope="job"/>
<bean id="itemWriter" class="test.TestItemWriter"/>
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>
<bean class="org.springframework.batch.test.JobLauncherTestUtils">
<property name="job" ref="job"/>
</bean>
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
<batch:job-repository id="jobRepository"/>
<jdbc:embedded-database id="dataSource" type="HSQL">
<jdbc:script location="classpath:/org/springframework/batch/core/schema-hsqldb.sql"/>
</jdbc:embedded-database>
<task:executor id="executor" queue-capacity="0" pool-size="5"/>
</beans>
JobTest
package test;
import java.util.Collection;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
import org.springframework.test.context.ContextConfiguration;
@ContextConfiguration(locations={"test-job-context.xml"})
public class JobTest extends AbstractJUnit4SpringContextTests {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@BeforeClass
public static void beforeClassSetup() {
BasicConfigurator.configure();
Logger.getRootLogger().setLevel(Level.WARN);
Logger.getLogger("org.springframework.batch.core.scope.JobScope").setLevel(Level.DEBUG);
Logger.getLogger("org.springframework.batch.core.scope.StepScope").setLevel(Level.DEBUG);
}
@Test
public void testJobLaunch() throws Exception {
JobExecution execution = jobLauncherTestUtils.launchJob();
System.out.println("After execution " + execution);
Collection<StepExecution> stepExecutions = execution.getStepExecutions();
for (StepExecution stepExecution : stepExecutions) {
System.out.println("StepExecution " + stepExecution);
}
}
}
Running the above JUnit test will reproduce the issue. If you change the scope on the reader to step, or remove the scope the test completes normally.
Multithreaded steps. By default, Spring Batch uses the same thread to execute a batch job from start to finish, meaning that everything runs sequentially. Spring Batch also allows multithreading at the step level. This makes it possible to process chunks using several threads.
As you probably know, the default bean scope in Spring is a singleton. But by specifying a spring batch component being StepScope means that Spring Batch will use the spring container to instantiate a new instance of that component for each step execution.
They have reserved names, so the easiest way is to use the Spring Expression Language: Access property 'xy' on job parameters: #{jobParameters[xy]} Access property 'xy' on the job's execution context: #{jobExecutionContext[xy]}
Job scope is one such scope that is defined by the spring framework. It enables us to define beans whose life span is tied to an active job instance. The bean is created when the job is first invoked and is destroyed when the job terminates.
This is due to the fact that the current execution (JobExecution
in this case) is stored in a ThreadLocal
(see org.springframework.batch.core.scope.context.SynchronizationManagerSupport
). That being said, adding multithreaded support for this doesn't seem unreasonable. Feel free to create a Jira issue for it (and a Pull Request if you so incline).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With