According to the Spring Batch documentation restarting of a job is supported out of the box but I cannot get it to start from where it left of. e.g. If my step processed 10 records it should start at record 11 with processing whenever I restart it. In practice this doesn't happen. It reads from the beginnen en reprocesses everything.
Does anybody have a Java config based configuration of a simple job that reads a delimited file and writes the content to a db table that can be restarted from the point it stopped?
@Configuration
public class BatchConfiguration {
@Value("${spring-batch.databaseType}")
private String databaseType;
@Value("${spring-batch.databaseSchema}")
private String schemaName;
@Bean
public JobBuilderFactory jobBuilderFactory(final JobRepository jobRepository) {
return new JobBuilderFactory(jobRepository);
}
@Bean
public StepBuilderFactory stepBuilderFactory(final JobRepository jobRepository,
final PlatformTransactionManager transactionManager) {
return new StepBuilderFactory(jobRepository, transactionManager);
}
@Bean
public JobRepository jobRepository(final DataSource dataSource, final PlatformTransactionManager transactionManager) {
final JobRepositoryFactoryBean bean = new JobRepositoryFactoryBean();
bean.setDatabaseType(databaseType);
bean.setDataSource(dataSource);
if (StringUtils.isNotBlank(schemaName)) {
bean.setTablePrefix(schemaName);
}
bean.setTransactionManager(transactionManager);
try {
bean.afterPropertiesSet();
return bean.getObject();
} catch (final Exception e) {
throw new BatchConfigurationException("Invalid batch job repository configuration.", e);
}
}
@Bean
public JobLauncher jobLauncher(final JobRepository jobRepository) {
final SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
return jobLauncher;
}
}
@Configuration
@EnableScheduling
@ComponentScan("com.some.package")
public class BatchJobConfiguration {
@Resource
private JobBuilderFactory jobBuilderFactory;
@Resource
private StepBuilderFactory stepBuilderFactory;
@Value("${savings-transaction.file}")
private String savingsTransactionFile;
@Value("${savings-balance.file}")
private String savingsBalanceFile;
@Value("${processed-directory}")
private String processedDirectory;
private static final Integer IMPORT_CHUNKSIZE = 10;
@Bean
@DependsOn("stepBuilderFactory")
public Step savingsTransactionStep(final PlatformTransactionManager transactionManager,
@Qualifier("savingsTransactionItemReader") final ItemReader<SavingsTransactionItem> savingsTransactionItemReader,
@Qualifier("savingsTransactionProcessor") final ItemProcessor<SavingsTransactionItem, SavingsTransaction> processor,
@Qualifier("savingsTransactionItemWriter") final ItemWriter<SavingsTransaction> savingsTransactionItemWriter,
@Qualifier("savingsTransactionStepListener") final SavingsTransactionStepListener listener) {
return stepBuilderFactory.get("savingsTransactionStep")
.transactionManager(transactionManager)
.<SavingsTransactionItem, SavingsTransaction> chunk(IMPORT_CHUNKSIZE)
.reader(savingsTransactionItemReader)
.processor(processor)
.writer(savingsTransactionItemWriter)
.listener(listener)
.build();
}
@Bean
public Step savingsTransactionCleanUpStep(final PlatformTransactionManager transactionManager,
final JobRepository jobRepository) {
final TaskletStep taskletStep = new TaskletStep("savingsTransactionCleanUpStep");
final FileMovingTasklet tasklet = new FileMovingTasklet();
tasklet.setFileNamePattern(savingsTransactionFile);
tasklet.setProcessedDirectory(processedDirectory);
taskletStep.setTasklet(tasklet);
taskletStep.setTransactionManager(transactionManager);
taskletStep.setJobRepository(jobRepository);
try {
taskletStep.afterPropertiesSet();
} catch (final Exception e) {
throw new BatchConfigurationException("Failed to configure tasklet!", e);
}
return taskletStep;
}
@Bean
@DependsOn("jobBuilderFactory")
public Job job(final Step savingsTransactionStep,
final Step savingsTransactionCleanUpStep) {
return jobBuilderFactory.get("job")
.incrementer(new RunIdIncrementer())
.start(savingsTransactionStep)
.next(savingsTransactionCleanUpStep)
.on("FINISHED")
.end()
.build()
.build();
}
}
Unit test code that restarts the job
final Date now = new Date();
jobMananger.processRegistrations(now);
final List<SavingsBalance> savingsBalances = savingsBalanceDao.findAll();
assertEquals(9, savingsBalances.size());
FileUtils.moveFile(new File("target/AEA001_20160610.dat"), new File("target/AEA001_20160610_invalid.dat"));
FileUtils.moveFile(new File("target/AEA001_20160610_valid.dat"), new File("target/AEA001_20160610.dat"));
jobMananger.processRegistrations(now);
final List<SavingsBalance> savingsBalances2 = savingsBalanceDao.findAll();
System.out.println(savingsBalances2.size());
int found = 0;
for (final SavingsBalance savingsBalance : savingsBalances2) {
final String id = savingsBalance.getId();
if ("12345".equals(id)) {
found++;
}
}
assertEquals("Invalid number of found balances!", 1, found);
The job manager implementation
public class JobManager {
@Resource
private JobLauncher jobLauncher;
@Resource
private Job job;
@Transactional(propagation = Propagation.NOT_SUPPORTED)
public void processRegistrations(final Date date) {
try {
final Map<String, JobParameter> parameters = new HashMap<>();
parameters.put("START_DATE", new JobParameter(date));
final JobParameters jobParameters = new JobParameters(parameters);
final JobExecution execution = jobLauncher.run(job, jobParameters);
LOG.info("Exit Status : " + execution.getStatus());
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException
| JobParametersInvalidException e) {
LOG.error("Failed to process registrations.", e);
}
}
}
Open Spring Tool Suite, on main menu, choose File->New->Spring Starter Project, input project info. Press Next then Finish, a Spring Boot project will be created successfully.
Going by the concept of Job Instances and Job Executions in Spring Batch, you can't start a COMPLETED job instance again though you can launch same instance of job again & again till its not COMPLETE ( and few more job statuses ). Job instance uniqueness is achieved by jobId & job parameters.
Spring Cloud Data Flow lets you restart a Spring Batch Job. For instance, if a Spring Batch Job fails to run, you can restarted it from the SCDF dashboard so that the batch-job can pick up the work where it left off.
To restart a stopped task, update the task status to Pending by either: Sending an updateTask XML transaction request to InfoSphere® MDM with a start task action code. Running the command runbatch.sh -start <processId>
Inside your JobManager class , instead of using JobLauncher , use JobOperator.restart() nethod .
The reason why your job is not getting restarted from the last failed step is because with JobLauncher you are again starting one more new job and hence it is starting the job from the step one .
Please make sure that "restartable" property is set to true (By default it is set to true ) .
Here is the sample code .
public boolean resumeWorkflow(long executionId)
throws WorkflowResumeServiceException {
JobOperator jobOperator = (JobOperator) ApplicationContextProvider.getApplicationContext().getBean("jobOperator");
try
{
LOGGER.info("SUMMARY AFTER RESTART:" + jobOperator.getSummary(executionId));
jobOperator.restart(executionId);
}
}
You need to get the jobExecutionid of the failed job and pass it to the above method .
Please note that a job which is completed with "FINISHED" status can not be restarted .
You can read this post also Restarting a job
It seems you have to configure the following beans to be able to restart your job.
@Bean
public JobOperator jobOperator(final JobLauncher jobLauncher, final JobRepository jobRepository,
final JobRegistry jobRegistry, final JobExplorer jobExplorer) {
final SimpleJobOperator jobOperator = new SimpleJobOperator();
jobOperator.setJobLauncher(jobLauncher);
jobOperator.setJobRepository(jobRepository);
jobOperator.setJobRegistry(jobRegistry);
jobOperator.setJobExplorer(jobExplorer);
return jobOperator;
}
@Bean
public JobExplorer jobExplorer(final DataSource dataSource) throws Exception {
final JobExplorerFactoryBean bean = new JobExplorerFactoryBean();
bean.setDataSource(dataSource);
bean.setTablePrefix("BATCH_");
bean.setJdbcOperations(new JdbcTemplate(dataSource));
bean.afterPropertiesSet();
return bean.getObject();
}
Then you need to retrieve the batch instance id from the batch tables to be able to restart that specific instance by using the jobOperator.
final Long restartId = jobOperator.restart(id);
final JobExecution restartExecution = jobExplorer.getJobExecution(restartId);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With