I am trying to pass safely params from tasklet to a step in the same job.
My job consist 3 tasklets(step1,step2,step3) one after another and in the end a step4(processor,reader,writer)
this job is being executed many times in parallel.
In step1 inside the tasklet I am evaluating param(hashId) via web service) than I am passing it all over my chain till my reader (which on step 4)
In step 3 I am creating new param called: filePath which is based on hashid and I send it over to step4(the reader) as a file resource location
I am using stepExecution to pass this param(hashId and filePath).
I tried 3 ways doing it via the tasklet:
to pass the param(hashId from step1 into step2 and from step2 into step 3) I am doing this:
chunkContext.getStepContext()
.getStepExecution()
.getExecutionContext()
.put("hashId", hashId);
In step4 I am populating filePath based on hashId and pass it this way to my last step(which is reader processor and a writer)
public class DownloadFileTasklet implements Tasklet, StepExecutionListener {
..
@Override
public RepeatStatus execute(ChunkContext chunkContext, ExecutionContext
executionContext) throws IOException {
String hashId = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext().get("hashId");
...
filepath="...hashId.csv";
//I used here executionContextPromotionListener in order to promote those keys
chunkContext.getStepContext()
.getStepExecution()
.getExecutionContext()
.put("filePath", filePath);
}
logger.info("filePath + "for hashId=" + hashId);
}
@Override
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
Pay attention that I am printing hashId and filePath values right before I am finished that step(step3). by the logs they are consistent and populated as expected
I also added logs within my reader to see log the params that I get.
@Bean
@StepScope
public ItemStreamReader<MyDTO> reader(@Value("#{jobExecutionContext[filePath]}") String filePath) {
logger.info("test filePath="+filePath+");
return itemReader;
}
When I execute this job ~10 times I can see that the param filePath value is populated with other jobs filePath values when executing in parallel
This is how I promote the job's keys with executionContextPromotionListener:
job definition:
@Bean
public Job processFileJob() throws Exception {
return this.jobs.get("processFileJob").
start.(step1).
next(step2)
next(downloadFileTaskletStep()). //step3
next(processSnidFileStep()).build(); //step4
}
step 3 definition
public Step downloadFileTaskletStep() {
return this.steps.get("downloadFileTaskletStep").tasklet(downloadFileTasklet()).listener(executionContextPromotionListener()).build();
}
@Bean
public org.springframework.batch.core.listener.ExecutionContextPromotionListener executionContextPromotionListener() {
ExecutionContextPromotionListener executionContextPromotionListener = new ExecutionContextPromotionListener();
executionContextPromotionListener.setKeys(new String[]{"filePath"});
return executionContextPromotionListener;
}
Same results threads messing the params
I can track the results via spring batch database table: batch_job_execution_context.short_context:
here you can see the the filePatch which built by the hashid is not identical to the origin hashId //incorrect record///
{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","/etc/mydir/services/notification_processor/files/2015_04_22/f1c7b0f2180b7e266d36f87fcf6fb7aa.csv"]},{"string":["hashId","20df39d201fffc7444423cfdf2f43789"]}]}]}
Now if we check other records they seems good. but always one or two messed up
//correct records
{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","\/etc\/mydir\/services\/notification_processor\/files\/2015_04_22\/**c490c8282628b894727fc2a4d6fc0cb5**.csv"]},{"string":["hashId","**c490c8282628b894727fc2a4d6fc0cb5**"]}]}]}
{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","\/etc\/mydir\/services\/notification_processor\/files\/2015_04_22\/**2b21d3047208729192b87e90e4a868e4**.csv"]},{"string":["hashId","**2b21d3047208729192b87e90e4a868e4**"]}]}]}
Any idea why I have those Threading issues?
To review your attempted methods:
JobParameters
JobParameters
are immutable in a job so attempting to modify them during job execution should not be attempted.JobParameters
v2
Method 2 is really the same as method 1, you're only going at getting the reference to the JobParameters
a different way.ExecutionContextPromotionListener
. This is the correct way, but you're doing things incorrectly. The ExecutionContextPromotionListener
looks at the step's ExecutionContext
and copies the keys you specify over to the job's ExecutionContext
. You're adding the keys directly to the Job ExecutionContext which is a bad idea.So in short, Method 3 is the closest to correct, but you should be adding the properties you want to share to the step's ExecutionContext
and then configure the ExecutionContextPromotionListener
to promote the appropriate keys to the Job's ExecutionContext
.
The code would be updated as follows:
chunkContext.getStepContext()
.getStepExecution()
.getExecutionContext()
.put("filePath", filePath);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With