As we can see in Spring batch official documentation "Passing Data to Future Steps - Spring Batch" is possible but most of us have been struggling with it because in the official documentation they have mentioned the two possibilities. Step level and one at the Job level. The problem is how to retrieve the data at the step level?
My solution was the same as they related in official documentation but it didn't work. So I decided to do the following:
1- Create bean for promotion listener:
<beans:bean id="promotionListener"
class="org.springframework.batch.core.listener.ExecutionContextPromotionListener">
<beans:property name="keys" value="someValues"/>
</beans:bean>
2- Set the listener in your step (The step you wish to save the data)
<listeners>
<listener ref="promotionListener"/>
</listeners>
3- In the same step (the step in which the data will be saved) in your writer, you save the data.
private StepExecution stepExecution;
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
ExecutionContext executionContext = stepExecution.getExecutionContext();
Map<String, String> fieldsMap= new HashMap<>();
executionContext.put("keys", someValues);
}
@Override
public void write(List<? extends Map<String, String>> items) throws Exception {
LOGGER.info(items.toString());
Map<String, String> fieldsMap= new ConcurrentHashMap<>();
items.forEach(item -> item.forEach(fieldsMap::put));
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put("keys", fieldsMap);
}
You can see, in my case I am saving the data in a Map (ConcurrentHashMap).
4- IMPORTANT: In the next step, you want to retrieve the data that we saved in the previous step. In that order, we must do:
Declare the object which will hold the value that we will retrieve as:
private Map fieldsMap;
Pay attention to JobExecution
@BeforeStep
public void retrieveInterStepData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
Collection<StepExecution> stepExecutions = jobExecution.getStepExecutions();
for (StepExecution steps : stepExecutions) {
ExecutionContext executionContext = steps.getExecutionContext();
if (executionContext.containsKey("keys")) {
this.nationalityMap = (Map<String, String>) executionContext.get("keys");
}
}
}
That's it! You may wonder why I didn't follow the way it is written in official documentation? The reason is that I am working with Steps in the same job. They share the same job execution. Now take a look at the picture of my debugging mode.
Suggestion please if there is another way.
NOTE: Please don't just copy and paste code form official documentation, provide your own answer or implementation.
The link of spring batch documentation which relates about this issue is below enter link description here
Passing data between steps. In Spring Batch, ExecutionContext of execution context that can be used in the scope of each step and job is provided. By using the execution context, data can be shared between the components in the step.
When the job having error, is to be recovered by only re-running the target job, tasklet model can be chooseed to make recovery simple. In chunk model, it should be dealt by returning the processed data to the state before executing the job and by creating a job to process only the unprocessed data.
An ExecutionContext is a set of key-value pairs containing information that is scoped to either StepExecution or JobExecution . Spring Batch persists the ExecutionContext , which helps in cases where you want to restart a batch run (e.g., when a fatal error has occurred, etc.).
Using Custom SkipPolicy For that purpose, Spring Batch framework provides the SkipPolicy interface. We can then provide our own implementation of skip logic and plug it into our step definition.
You are confusing the keys to be promoted from the step execution context to the job execution context with data itself. This confusion comes from two places:
<beans:property name="keys" value="someValues"/>
: someValues
should be someKeys
executionContext.put("keys", someValues);
in the @BeforeStep
is incorrectLet me make this a bit clearer. Imagine you have a job with two steps:
In this case, you can use the promotion listener to promote the key "count" from the step execution context of step 1 to the job execution context so that step 2 can access it. Here is an example:
import java.util.Arrays;
import java.util.List;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.listener.ExecutionContextPromotionListener;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class MyJob {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public ItemReader<Integer> itemReader() {
return new ListItemReader<>(Arrays.asList(1, 2, 3, 4));
}
@Bean
public ItemWriter<Integer> itemWriter() {
return new ItemWriter<Integer>() {
private StepExecution stepExecution;
@Override
public void write(List<? extends Integer> items) {
for (Integer item : items) {
System.out.println("item = " + item);
}
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
int count = stepContext.containsKey("count") ? stepContext.getInt("count") : 0;
stepContext.put("count", count + items.size());
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
};
}
@Bean
public Step step1() {
return steps.get("step1")
.<Integer, Integer>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.listener(promotionListener())
.build();
}
@Bean
public Step step2() {
return steps.get("step2")
.tasklet((contribution, chunkContext) -> {
// retrieve the key from the job execution context
Integer count = (Integer) chunkContext.getStepContext().getJobExecutionContext().get("count");
System.out.println("In step 2: step 1 wrote " + count + " items");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] {"count"});
return listener;
}
@Bean
public Job job() {
return jobs.get("job")
.start(step1())
.next(step2())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
}
This prints:
item = 1
item = 2
item = 3
item = 4
In step 2: step 1 wrote 4 items
Hope this helps.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With