Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Partitioned Job can't stop by itself after finishing? Spring Batch

I wrote a Job of two Steps, with one of two steps being a partitioning step. The partition step uses TaskExecutorPartitionHandler and runs 5 slave steps in threads. The job is started in the main() method. But it's not stopping after every slave ItemReader returned null- the finish symbol. And even after the program ran past the last line of code in main() method (which is System.out.println("Finished")) the program process won't stop, hanging in memory and doing nothing. I have to press the stop button on Eclipse's panel to stop the program.

the following is the content of a JobExecution returned by JobLauncher.run(), signaling the successful status of the Job run..

JobExecution: id=0, version=2, startTime=Fri Nov 27 06:05:23 CST 2015, endTime=Fri Nov 27 06:05:39 CST 2015, lastUpdated=Fri Nov 27 06:05:39 CST 2015, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=0, version=0, Job=[jobCensoredPages]], jobParameters=[{}]

7217
Finished

Why does a Spring Batch program with a successful Job run still hang? Please point me where to work it out. I'm suspecting the multithreading part managed by Spring Batch does not stop..

simple job run code

        Job job = (Job) context.getBean("jobPages");
        try {
            JobParameters p=new JobParametersBuilder()
                .toJobParameters();
            
            JobExecution result = launcher.run(job, new JobParameters());
         
            System.out.println(result.toString());
        } catch (Exception e) {
            e.printStackTrace();
        }
        context.getBean("idSet");
        AtomicInteger n=(AtomicInteger) context.getBean("pageCount");
        System.out.println(n.get());        
        System.out.println("Finished");

Configuation for Patitioner and PatitionHandler

    @Bean @Autowired 
    public PartitionHandler beanPartitionHandler(
        TaskExecutor beanTaskExecutor, 
        @Qualifier("beanStepSlave") Step beanStepSlave
        ) throws Exception
    {
        TaskExecutorPartitionHandler h=new TaskExecutorPartitionHandler();
        h.setGridSize(5);
        h.setTaskExecutor(beanTaskExecutor);
        h.setStep(beanStepSlave);   
        h.afterPropertiesSet(); 
        return h;
    }
    @Bean public TaskExecutor beanTaskExecutor() {
        ThreadPoolTaskExecutor e = new ThreadPoolTaskExecutor();
        e.setMaxPoolSize(5);
        e.setCorePoolSize(5);
        e.afterPropertiesSet();
        return e;
    }

the only step and it's slave step

@Bean public Step beanStepMaster(
        Step beanStepSlave,
        Partitioner beanPartitioner,
        PartitionHandler beanPartitionHandler
        )   throws Exception 
    {
        return stepBuilderFactory().get("stepMaster")
        .partitioner(beanStepSlave)
        .partitioner("stepSlave", beanPartitioner)
        .partitionHandler(partitionHandler)
        .build();
    }
    @Bean @Autowired 
    public Step beanStepSlave(
        ItemReader<String> beanReaderTest,
        ItemProcessor<String, String> beanProcessorTest,
        ItemWriter<String> beanWriterTest) throws Exception{
        return stepBuilderFactory().get("stepSlave")
            .<String, String>chunk(1)
            .reader(beanReaderTest)
            .processor(beanProcessorTest)
            .writer(beanWriterTest)
            .build();
    }

My pom.xml file

<dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>RELEASE</version>
      <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-core</artifactId>
        <version>4.2.3.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>4.2.3.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-tx</artifactId>
        <version>4.2.3.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.batch</groupId>
        <artifactId>spring-batch-core</artifactId>
        <version>RELEASE</version>      
    </dependency>
    <dependency>
        <groupId>org.springframework.retry</groupId>
        <artifactId>spring-retry</artifactId>
        <version>1.1.2.RELEASE</version>
    </dependency>
    
<dependency>    
    <groupId>org.springframework</groupId>
    <artifactId>spring-beans</artifactId>
    <version>RELEASE</version>
</dependency>
like image 291
wokmi Avatar asked Nov 26 '15 22:11

wokmi


3 Answers

I also had difficulty with my partitioned Spring batch application hanging on completion when I used a ThreadPoolTaskExecutor. In addition, I saw that the executor was not allowing the work of all the partitions to finish.

I found two ways of solving those issues.

The first solution is using a SimpleAsyncTaskExecutor instead of a ThreadPoolTaskExecutor. If you do not mind the extra overhead in re-creating threads, this is a simple fix.

The second solution is creating a JobExecutionListener that calls shutdown on the ThreadPoolTaskExecutor.

I created a JobExecutionListener like this:

@Bean
public JobExecutionListener jobExecutionListener(ThreadPoolTaskExecutor executor) {
    return new JobExecutionListener() {
        private ThreadPoolTaskExecutor taskExecutor = executor;
        @Override
        public void beforeJob(JobExecution jobExecution) {

        }

        @Override
        public void afterJob(JobExecution jobExecution) {
            taskExecutor.shutdown();
        }
    };
}

and added it to my Job definition like this:

@Bean
public Job partitionedJob(){
    return jobBuilders.get("partitionedJob")
            .listener(jobExecutionListener(taskExecutor()))
            .start(partitionedStep())
            .build();
}
like image 129
cschmelling Avatar answered Oct 14 '22 06:10

cschmelling


All of the above answers are hack/work around. Root cause of the issue posted in the question is that the threadPoolTaskExecutor doesn't shares the lifecycle of the step. Hence while destroying the step/job context , the threadpool is not destroyed automatically and it is running forever. Bringing the threadPoolExecutor within the the stepContext "@StepScope" should do the trick. Spring takes care of destroying it.

@Bean @StepScope public ThreadPoolTaskExecutor threadPoolTaskExecutor() {

like image 43
nishanth arun Avatar answered Oct 14 '22 07:10

nishanth arun


There are 2 solutions to your problem, although I don't know the cause.

First, you can use a CommandLineJobRunner to launch the Job. See documentation here. This class automatically exits the program at the end of the job and converts the ExitStatus to a return code (COMPLETED = 0, FAILED = 1...). The default return code are provided by a SimpleJvmExitCodeMapper.

The second solution would be to manually call a System.exit() instruction after your JobLauncher.run(). You can also convert the ExitStatus of the Job manually and use it in your manual exit :

// Create Job
JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
Job job = (Job) context.getBean(jobName);

// Create return codes mapper
SimpleJvmExitCodeMapper mapper = new SimpleJvmExitCodeMapper();

// Start Job
JobExecution execution = jobLauncher.run(job, new JobParameters());

// Close context
context.close();

// Map codes and exit
String status = execution.getExitStatus().getExitCode();
Integer returnCode = mapper.intValue(status);
System.exit(returnCode);
like image 44
Thrax Avatar answered Oct 14 '22 06:10

Thrax