Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Passing Data to Future Steps - Spring Batch

Tags:

As we can see in Spring batch official documentation "Passing Data to Future Steps - Spring Batch" is possible but most of us have been struggling with it because in the official documentation they have mentioned the two possibilities. Step level and one at the Job level. The problem is how to retrieve the data at the step level?

My solution was the same as they related in official documentation but it didn't work. So I decided to do the following:

1- Create bean for promotion listener:

<beans:bean id="promotionListener"
                class="org.springframework.batch.core.listener.ExecutionContextPromotionListener">
        <beans:property name="keys" value="someValues"/>
</beans:bean>

2- Set the listener in your step (The step you wish to save the data)

        <listeners>
            <listener ref="promotionListener"/>
        </listeners>

3- In the same step (the step in which the data will be saved) in your writer, you save the data.

private StepExecution stepExecution;

    @BeforeStep
    public void saveStepExecution(StepExecution stepExecution) {
        this.stepExecution = stepExecution;

        ExecutionContext executionContext = stepExecution.getExecutionContext();
        Map<String, String> fieldsMap= new HashMap<>();
        executionContext.put("keys", someValues);
    }

@Override
public void write(List<? extends Map<String, String>> items) throws Exception {
    LOGGER.info(items.toString());

    Map<String, String> fieldsMap= new ConcurrentHashMap<>();
    items.forEach(item -> item.forEach(fieldsMap::put));

    ExecutionContext stepContext = this.stepExecution.getExecutionContext();
    stepContext.put("keys", fieldsMap);
}

You can see, in my case I am saving the data in a Map (ConcurrentHashMap).

4- IMPORTANT: In the next step, you want to retrieve the data that we saved in the previous step. In that order, we must do:

  • Declare the object which will hold the value that we will retrieve as:

    private Map fieldsMap;

Pay attention to JobExecution

enter image description here

@BeforeStep
    public void retrieveInterStepData(StepExecution stepExecution) {
        JobExecution jobExecution = stepExecution.getJobExecution();
        Collection<StepExecution> stepExecutions = jobExecution.getStepExecutions();
        for (StepExecution steps : stepExecutions) {
            ExecutionContext executionContext = steps.getExecutionContext();
            if (executionContext.containsKey("keys")) {
                this.nationalityMap = (Map<String, String>) executionContext.get("keys");
            }
        }
    }

That's it! You may wonder why I didn't follow the way it is written in official documentation? The reason is that I am working with Steps in the same job. They share the same job execution. Now take a look at the picture of my debugging mode.

Suggestion please if there is another way.

NOTE: Please don't just copy and paste code form official documentation, provide your own answer or implementation.

The link of spring batch documentation which relates about this issue is below enter link description here

like image 572
Eddy Bayonne Avatar asked Sep 11 '18 08:09

Eddy Bayonne


People also ask

How do I pass information from one step to another in Spring Batch?

Passing data between steps. In Spring Batch, ExecutionContext of execution context that can be used in the scope of each step and job is provided. By using the execution context, data can be shared between the components in the step.

What is difference between step Tasklet and chunk in Spring Batch?

When the job having error, is to be recovered by only re-running the target job, tasklet model can be chooseed to make recovery simple. In chunk model, it should be dealt by returning the processed data to the state before executing the job and by creating a job to process only the unprocessed data.

What is ExecutionContext in Spring Batch?

An ExecutionContext is a set of key-value pairs containing information that is scoped to either StepExecution or JobExecution . Spring Batch persists the ExecutionContext , which helps in cases where you want to restart a batch run (e.g., when a fatal error has occurred, etc.).

Can we skip processor in Spring Batch?

Using Custom SkipPolicy For that purpose, Spring Batch framework provides the SkipPolicy interface. We can then provide our own implementation of skip logic and plug it into our step definition.


1 Answers

You are confusing the keys to be promoted from the step execution context to the job execution context with data itself. This confusion comes from two places:

  • <beans:property name="keys" value="someValues"/>: someValues should be someKeys
  • calling executionContext.put("keys", someValues); in the @BeforeStep is incorrect

Let me make this a bit clearer. Imagine you have a job with two steps:

  • step 1 : reads/writes some items and write the item count in the step execution context
  • step 2 : needs to access the item count and print it to the console.

In this case, you can use the promotion listener to promote the key "count" from the step execution context of step 1 to the job execution context so that step 2 can access it. Here is an example:

import java.util.Arrays;
import java.util.List;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.listener.ExecutionContextPromotionListener;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class MyJob {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Bean
    public ItemReader<Integer> itemReader() {
        return new ListItemReader<>(Arrays.asList(1, 2, 3, 4));
    }

    @Bean
    public ItemWriter<Integer> itemWriter() {
        return new ItemWriter<Integer>() {

            private StepExecution stepExecution;

            @Override
            public void write(List<? extends Integer> items) {
                for (Integer item : items) {
                    System.out.println("item = " + item);
                }
                ExecutionContext stepContext = this.stepExecution.getExecutionContext();
                int count = stepContext.containsKey("count") ? stepContext.getInt("count") : 0;
                stepContext.put("count", count + items.size());
            }

            @BeforeStep
            public void saveStepExecution(StepExecution stepExecution) {
                this.stepExecution = stepExecution;
            }
        };
    }

    @Bean
    public Step step1() {
        return steps.get("step1")
                .<Integer, Integer>chunk(2)
                .reader(itemReader())
                .writer(itemWriter())
                .listener(promotionListener())
                .build();
    }

    @Bean
    public Step step2() {
        return steps.get("step2")
                .tasklet((contribution, chunkContext) -> {
                    // retrieve the key from the job execution context
                    Integer count = (Integer) chunkContext.getStepContext().getJobExecutionContext().get("count");
                    System.out.println("In step 2: step 1 wrote " + count + " items");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    @Bean
    public ExecutionContextPromotionListener promotionListener() {
        ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
        listener.setKeys(new String[] {"count"});
        return listener;
    }

    @Bean
    public Job job() {
        return jobs.get("job")
                .start(step1())
                .next(step2())
                .build();
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

}

This prints:

item = 1
item = 2
item = 3
item = 4
In step 2: step 1 wrote 4 items

Hope this helps.

like image 69
Mahmoud Ben Hassine Avatar answered Sep 28 '22 19:09

Mahmoud Ben Hassine