Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can you restart a failed spring batch job and let it pick up where it left off?

According to the Spring Batch documentation restarting of a job is supported out of the box but I cannot get it to start from where it left of. e.g. If my step processed 10 records it should start at record 11 with processing whenever I restart it. In practice this doesn't happen. It reads from the beginnen en reprocesses everything.

Does anybody have a Java config based configuration of a simple job that reads a delimited file and writes the content to a db table that can be restarted from the point it stopped?

@Configuration
public class BatchConfiguration {

    @Value("${spring-batch.databaseType}")
    private String databaseType;

    @Value("${spring-batch.databaseSchema}")
    private String schemaName;

    @Bean
    public JobBuilderFactory jobBuilderFactory(final JobRepository jobRepository) {
        return new JobBuilderFactory(jobRepository);
    }

    @Bean
    public StepBuilderFactory stepBuilderFactory(final JobRepository jobRepository,
        final PlatformTransactionManager transactionManager) {
        return new StepBuilderFactory(jobRepository, transactionManager);
    }

    @Bean
    public JobRepository jobRepository(final DataSource dataSource, final PlatformTransactionManager transactionManager) {

        final JobRepositoryFactoryBean bean = new JobRepositoryFactoryBean();
        bean.setDatabaseType(databaseType);
        bean.setDataSource(dataSource);
        if (StringUtils.isNotBlank(schemaName)) {
            bean.setTablePrefix(schemaName);
        }
        bean.setTransactionManager(transactionManager);
        try {
            bean.afterPropertiesSet();
            return bean.getObject();
        } catch (final Exception e) {
            throw new BatchConfigurationException("Invalid batch job repository configuration.", e);
        }
    }

    @Bean
    public JobLauncher jobLauncher(final JobRepository jobRepository) {

        final SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        return jobLauncher;
    }

}

@Configuration
@EnableScheduling
@ComponentScan("com.some.package")
public class BatchJobConfiguration {

    @Resource
    private JobBuilderFactory jobBuilderFactory;

    @Resource
    private StepBuilderFactory stepBuilderFactory;

    @Value("${savings-transaction.file}")
    private String savingsTransactionFile;

    @Value("${savings-balance.file}")
    private String savingsBalanceFile;

    @Value("${processed-directory}")
    private String processedDirectory;

    private static final Integer IMPORT_CHUNKSIZE = 10;

    @Bean
    @DependsOn("stepBuilderFactory")
    public Step savingsTransactionStep(final PlatformTransactionManager transactionManager,
            @Qualifier("savingsTransactionItemReader") final ItemReader<SavingsTransactionItem> savingsTransactionItemReader,
            @Qualifier("savingsTransactionProcessor") final ItemProcessor<SavingsTransactionItem, SavingsTransaction> processor,
            @Qualifier("savingsTransactionItemWriter") final ItemWriter<SavingsTransaction> savingsTransactionItemWriter,
            @Qualifier("savingsTransactionStepListener") final SavingsTransactionStepListener listener) {

        return stepBuilderFactory.get("savingsTransactionStep")
                .transactionManager(transactionManager)
                .<SavingsTransactionItem, SavingsTransaction> chunk(IMPORT_CHUNKSIZE)
                .reader(savingsTransactionItemReader)
                .processor(processor)
                .writer(savingsTransactionItemWriter)
                .listener(listener)
                .build();
    }

    @Bean
    public Step savingsTransactionCleanUpStep(final PlatformTransactionManager transactionManager,
            final JobRepository jobRepository) {

        final TaskletStep taskletStep = new TaskletStep("savingsTransactionCleanUpStep");

        final FileMovingTasklet tasklet = new FileMovingTasklet();
        tasklet.setFileNamePattern(savingsTransactionFile);
        tasklet.setProcessedDirectory(processedDirectory);
        taskletStep.setTasklet(tasklet);
        taskletStep.setTransactionManager(transactionManager);
        taskletStep.setJobRepository(jobRepository);
        try {
            taskletStep.afterPropertiesSet();
        } catch (final Exception e) {
            throw new BatchConfigurationException("Failed to configure tasklet!", e);
        }

        return taskletStep;
    }

    @Bean
    @DependsOn("jobBuilderFactory")
    public Job job(final Step savingsTransactionStep,
            final Step savingsTransactionCleanUpStep) {

        return jobBuilderFactory.get("job")
                .incrementer(new RunIdIncrementer())
                .start(savingsTransactionStep)  
                .next(savingsTransactionCleanUpStep)                    
                .on("FINISHED")
                .end()
                .build()
                .build();
    }
}

Unit test code that restarts the job

    final Date now = new Date();
    jobMananger.processRegistrations(now);

    final List<SavingsBalance> savingsBalances = savingsBalanceDao.findAll();
    assertEquals(9, savingsBalances.size());

    FileUtils.moveFile(new File("target/AEA001_20160610.dat"), new File("target/AEA001_20160610_invalid.dat"));
    FileUtils.moveFile(new File("target/AEA001_20160610_valid.dat"), new File("target/AEA001_20160610.dat"));

    jobMananger.processRegistrations(now);

    final List<SavingsBalance> savingsBalances2 = savingsBalanceDao.findAll();
    System.out.println(savingsBalances2.size());
    int found = 0;
    for (final SavingsBalance savingsBalance : savingsBalances2) {

        final String id = savingsBalance.getId();
        if ("12345".equals(id)) {
            found++;
        }

    }

    assertEquals("Invalid number of found balances!", 1, found);

The job manager implementation

public class JobManager {

    @Resource
    private JobLauncher jobLauncher;

    @Resource
    private Job job;

    @Transactional(propagation = Propagation.NOT_SUPPORTED)
    public void processRegistrations(final Date date) {

        try {

            final Map<String, JobParameter> parameters = new HashMap<>();
            parameters.put("START_DATE", new JobParameter(date));

            final JobParameters jobParameters = new JobParameters(parameters);
            final JobExecution execution = jobLauncher.run(job, jobParameters);
            LOG.info("Exit Status : " + execution.getStatus());
        } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException
                | JobParametersInvalidException e) {
            LOG.error("Failed to process registrations.", e);
        }
    }

}
like image 385
Tranquilized Avatar asked Aug 09 '16 09:08

Tranquilized


People also ask

How do you restart a failed step in Spring Batch application?

Open Spring Tool Suite, on main menu, choose File->New->Spring Starter Project, input project info. Press Next then Finish, a Spring Boot project will be created successfully.

How do I rerun a Spring Batch job?

Going by the concept of Job Instances and Job Executions in Spring Batch, you can't start a COMPLETED job instance again though you can launch same instance of job again & again till its not COMPLETE ( and few more job statuses ). Job instance uniqueness is achieved by jobId & job parameters.

Is it possible to restart a completed job in Spring Batch?

Spring Cloud Data Flow lets you restart a Spring Batch Job. For instance, if a Spring Batch Job fails to run, you can restarted it from the SCDF dashboard so that the batch-job can pick up the work where it left off.

How do I restart a batch job?

To restart a stopped task, update the task status to Pending by either: Sending an updateTask XML transaction request to InfoSphere® MDM with a start task action code. Running the command runbatch.sh -start <processId>


Video Answer


2 Answers

Inside your JobManager class , instead of using JobLauncher , use JobOperator.restart() nethod .

The reason why your job is not getting restarted from the last failed step is because with JobLauncher you are again starting one more new job and hence it is starting the job from the step one .

Please make sure that "restartable" property is set to true (By default it is set to true ) .

Here is the sample code .

public boolean resumeWorkflow(long executionId)
        throws WorkflowResumeServiceException {
    JobOperator jobOperator = (JobOperator) ApplicationContextProvider.getApplicationContext().getBean("jobOperator");


    try 
    {

        LOGGER.info("SUMMARY AFTER RESTART:" + jobOperator.getSummary(executionId));
        jobOperator.restart(executionId);
    }
}

You need to get the jobExecutionid of the failed job and pass it to the above method .

Please note that a job which is completed with "FINISHED" status can not be restarted .

You can read this post also Restarting a job

like image 37
DevG Avatar answered Sep 21 '22 06:09

DevG


It seems you have to configure the following beans to be able to restart your job.

@Bean
public JobOperator jobOperator(final JobLauncher jobLauncher, final JobRepository jobRepository,
        final JobRegistry jobRegistry, final JobExplorer jobExplorer) {
    final SimpleJobOperator jobOperator = new SimpleJobOperator();
    jobOperator.setJobLauncher(jobLauncher);
    jobOperator.setJobRepository(jobRepository);
    jobOperator.setJobRegistry(jobRegistry);
    jobOperator.setJobExplorer(jobExplorer);
    return jobOperator;
}

@Bean
public JobExplorer jobExplorer(final DataSource dataSource) throws Exception {
    final JobExplorerFactoryBean bean = new JobExplorerFactoryBean();
    bean.setDataSource(dataSource);
    bean.setTablePrefix("BATCH_");
    bean.setJdbcOperations(new JdbcTemplate(dataSource));
    bean.afterPropertiesSet();
    return bean.getObject();
}

Then you need to retrieve the batch instance id from the batch tables to be able to restart that specific instance by using the jobOperator.

final Long restartId = jobOperator.restart(id);
final JobExecution restartExecution = jobExplorer.getJobExecution(restartId);
like image 183
Tranquilized Avatar answered Sep 19 '22 06:09

Tranquilized