Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring Batch custom completion policy for dynamic chunk size

Context

We have a batch job that replicates localized country names (i.e. translations of country names to different languages) to our DB from the external one. The idea was to process all localized country names for a single country in 1 chunk (i.e. first chunk - all translations for Andorra, next chunk - all translations for U.A.E., etc.). We use JdbcCursorItemReader for reading external data + some oracle analytic functions to provide total number of translations available for the country: something like

select country_code, language_code, localized_name, COUNT(1) OVER(PARTITION BY c_lng.country_code) as lng_count
from EXT_COUNTRY_LNG c_lng
order by c_lng.countty_code, c_lng.language_code

Problem

So cutting this input by chunks looks simple: stop chunk when you've read the exact amount of rows specified in lng_count and start a new one with the next read row, but it appears not to be so simple practically :(

First thing to try is a custom completion policy. But the problem is, it doesn't have access to the last item, read by ItemReader - you should explicitly put it to context in reader and get it back in policy. Don't like it 'cause it requires additional reader modifications/adding reader listeners. Moreover I don't like the same item being serialized/deserialized back and forth. And I don't feel like JobContext/StepContext is a good place for such data.

There's also RepeatContext which looks like a better place for such data, but I was not able to get to it easily...

So finally we end up with solution like this:

@Bean(name = "localizedCountryNamesStep")
@JobScope
public Step insertCountryStep(
        final StepBuilderFactory stepBuilderFactory,
        final MasterdataCountryNameReader countryNameReader,
        final MasterdataCountryNameProcessor countryNameProcessor,
        final MasterdataCountryNameWriter writer) {
    /* Use the same fixed-commit policy, but update it's chunk size dynamically */
    final SimpleCompletionPolicy policy = new SimpleCompletionPolicy();
    return stepBuilderFactory.get("localizedCountryNamesStep")
            .<ExtCountryLng, LocalizedCountryName> chunk(policy)
            .reader(countryNameReader)
            .listener(new ItemReadListener<ExtCountryLng>() {

                @Override
                public void beforeRead() {
                    // do nothing
                }

                @Override
                public void afterRead(final ExtCountryLng item) {
                    /* Update the cunk size after every read: consequent reads 
                    inside the same country = same chunk do nothing since lngCount is always the same there */
                    policy.setChunkSize(item.getLngCount());
                }

                @Override
                public void onReadError(final Exception ex) {
                    // do nothing
                }
            })
            .processor(countryNameProcessor)
            .writer(writer)
            .faultTolerant()
            .skip(RuntimeException.class)
            .skipLimit(Integer.MAX_VALUE) // Batch does not support unlimited skip
            .retryLimit(0) // this solution disables only retry, but not recover
            .build();
}

It's working, it requires minimum code changes, but it's still a bit ugly for me. So I'm wondering, is there another elegant way to do a dynamic chunk size in Spring Batch when all the required information is already available at the ItemReader?

like image 295
FlasH from Ru Avatar asked May 23 '16 12:05

FlasH from Ru


People also ask

How do I change the chunk size in Spring Batch?

With enable annotation, you can use Spring batch features and provide a base configuration for setting up batch jobs in a Configuration class. In the above code, the chunk size is set to 5, the default batch chunk size is 1. So, it reads, processes, and writes 5 of the data set each time.

What is the use of chunk size in Spring Batch?

Spring Batch includes a batch-oriented algorithm to handle the execution flow called chunk processing. Spring Batch processes items in chunks. A job reads and writes items in small chunks. Chunk processing allows streaming data instead of loading all the data in memory.

How does Spring Batch handle exceptions?

By default , if there's an uncaught exception when processing the job, spring batch will stop the job. If the job is restarted with the same job parameters, it will pick up where it left off. The way it knows where the job status is by checking the job repository where it saves all the spring batch job status.

How do I share data between Tasklets?

Data passing between steps using tasklet model In order to save and fetch passing data, get ExecutionContext from ChunkContext and pass the data between the steps. Sr. No. Set the value to be passed to the after step in the ExecutionContext of the step execution context.


1 Answers

The easiest way would be to simply partition your step by country. That way each country would get its own step, and you would also be able to thread across countries for increased performance.

If it needs to be a single reader, you can wrap a delegate PeekableItemReader and extend SimpleCompletionPolicy to accomplish your goal.

public class CountryPeekingCompletionPolicyReader extends SimpleCompletionPolicy implements ItemReader<CountrySpecificItem> {

    private PeekableItemReader<? extends CountrySpecificItem> delegate;

    private CountrySpecificItem currentReadItem = null;

    @Override
    public CountrySpecificItem read() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception {
        currentReadItem = delegate.read();
        return currentReadItem;
    }

    @Override
    public RepeatContext start(final RepeatContext context) {
        return new ComparisonPolicyTerminationContext(context);
    }

    protected class ComparisonPolicyTerminationContext extends SimpleTerminationContext {

        public ComparisonPolicyTerminationContext(final RepeatContext context) {
            super(context);
        }

        @Override
        public boolean isComplete() {
            final CountrySpecificItem nextReadItem = delegate.peek();

            // logic to check if same country
            if (currentReadItem.isSameCountry(nextReadItem)) {
                return false;
            }

            return true;
        }
    }
}

Then in your context you would define:

<batch:tasklet>
    <batch:chunk chunk-completion-policy="countrySpecificCompletionPolicy" reader="countrySpecificCompletionPolicy" writer="someWriter" />
</batch:tasklet>

<bean id="countrySpecificCompletionPolicy" class="CountryPeekingCompletionPolicyReader">
     <property name="delegate" ref="peekableReader" />
</bean>


<bean id="peekableReader" class="YourPeekableItemReader" />

Edit: Thinking back over your issue, partitioning strikes me as the cleanest approach. Using a partitioned step, each ItemReader (make sure scope="step") will be passed a single countryName from the step execution context. Yes, you'll need a custom Partitioner class to build up your map of execution contexts (one entry per country) and a hard-coded commit interval large enough to accommodate your largest unit of work, but after that everything is very boilerplate, and since each slave step will only be a single chunk, restart should be a relative breeze for any countries that might hit issues.

like image 81
Dean Clark Avatar answered Oct 18 '22 21:10

Dean Clark