Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to safely pass params from Tasklet to step when running parallel jobs

I am trying to pass safely params from tasklet to a step in the same job.

My job consist 3 tasklets(step1,step2,step3) one after another and in the end a step4(processor,reader,writer)

this job is being executed many times in parallel.

In step1 inside the tasklet I am evaluating param(hashId) via web service) than I am passing it all over my chain till my reader (which on step 4)

In step 3 I am creating new param called: filePath which is based on hashid and I send it over to step4(the reader) as a file resource location

I am using stepExecution to pass this param(hashId and filePath).

I tried 3 ways doing it via the tasklet:

to pass the param(hashId from step1 into step2 and from step2 into step 3) I am doing this:

chunkContext.getStepContext()
        .getStepExecution()
        .getExecutionContext()
        .put("hashId", hashId);

In step4 I am populating filePath based on hashId and pass it this way to my last step(which is reader processor and a writer)

public class DownloadFileTasklet implements Tasklet, StepExecutionListener {
..

    @Override
     public RepeatStatus execute(ChunkContext chunkContext, ExecutionContext    
     executionContext) throws IOException {

    String hashId = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext().get("hashId");

          ...

filepath="...hashId.csv";
//I used here executionContextPromotionListener in order to promote those keys

        chunkContext.getStepContext()
        .getStepExecution()
        .getExecutionContext()
        .put("filePath", filePath);
    } 

logger.info("filePath + "for hashId=" + hashId);

}
@Override
public void beforeStep(StepExecution stepExecution) {
    this.stepExecution = stepExecution;
}

Pay attention that I am printing hashId and filePath values right before I am finished that step(step3). by the logs they are consistent and populated as expected

I also added logs within my reader to see log the params that I get.

@Bean
    @StepScope
    public ItemStreamReader<MyDTO> reader(@Value("#{jobExecutionContext[filePath]}") String filePath) {
              logger.info("test filePath="+filePath+");

        return itemReader;
    }

When I execute this job ~10 times I can see that the param filePath value is populated with other jobs filePath values when executing in parallel

This is how I promote the job's keys with executionContextPromotionListener:

job definition:

 @Bean
    public Job processFileJob() throws Exception {
        return this.jobs.get("processFileJob").
                start.(step1).
                next(step2)
                next(downloadFileTaskletStep()). //step3
                next(processSnidFileStep()).build();  //step4

    }

step 3 definition

  public Step downloadFileTaskletStep() {
        return this.steps.get("downloadFileTaskletStep").tasklet(downloadFileTasklet()).listener(executionContextPromotionListener()).build();
    }


  @Bean
    public org.springframework.batch.core.listener.ExecutionContextPromotionListener executionContextPromotionListener() {
        ExecutionContextPromotionListener executionContextPromotionListener = new ExecutionContextPromotionListener();
        executionContextPromotionListener.setKeys(new String[]{"filePath"});
        return executionContextPromotionListener;
    }

Same results threads messing the params

I can track the results via spring batch database table: batch_job_execution_context.short_context:

here you can see the the filePatch which built by the hashid is not identical to the origin hashId //incorrect record///

{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","/etc/mydir/services/notification_processor/files/2015_04_22/f1c7b0f2180b7e266d36f87fcf6fb7aa.csv"]},{"string":["hashId","20df39d201fffc7444423cfdf2f43789"]}]}]}

Now if we check other records they seems good. but always one or two messed up

//correct records

{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","\/etc\/mydir\/services\/notification_processor\/files\/2015_04_22\/**c490c8282628b894727fc2a4d6fc0cb5**.csv"]},{"string":["hashId","**c490c8282628b894727fc2a4d6fc0cb5**"]}]}]}

{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","\/etc\/mydir\/services\/notification_processor\/files\/2015_04_22\/**2b21d3047208729192b87e90e4a868e4**.csv"]},{"string":["hashId","**2b21d3047208729192b87e90e4a868e4**"]}]}]}   

Any idea why I have those Threading issues?

like image 219
rayman Avatar asked Apr 21 '15 15:04

rayman


1 Answers

To review your attempted methods:

  • Method 1 - Editing JobParameters JobParameters are immutable in a job so attempting to modify them during job execution should not be attempted.
  • Method 2 - Editing JobParameters v2 Method 2 is really the same as method 1, you're only going at getting the reference to the JobParameters a different way.
  • Method 3 - Using the ExecutionContextPromotionListener. This is the correct way, but you're doing things incorrectly. The ExecutionContextPromotionListener looks at the step's ExecutionContext and copies the keys you specify over to the job's ExecutionContext. You're adding the keys directly to the Job ExecutionContext which is a bad idea.

So in short, Method 3 is the closest to correct, but you should be adding the properties you want to share to the step's ExecutionContext and then configure the ExecutionContextPromotionListener to promote the appropriate keys to the Job's ExecutionContext.

The code would be updated as follows:

chunkContext.getStepContext()
            .getStepExecution()
            .getExecutionContext()
            .put("filePath", filePath);
like image 186
Michael Minella Avatar answered Oct 21 '22 20:10

Michael Minella