Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring Batch asynchronous processor configuration for best performance

I have a problem with creating asynchronous processor in Spring Batch. My processor is getting ID from reader and creating object based on response from SOAP call. Sometimes for 1 input (ID) there must be e.g. 60-100 SOAP calls and sometimes just 1. I tried to make multithreaded step it was processing e.g 50 inputs at time but it was useless because 49 threads done their job in 1 second and were blocked, waiting for this one which was doing 60-100 SOAP calls. Now i use AsyncItemProcessor+AsyncItemWriter but this solution works slowly for me. As my input (IDs) is large, around 25k items read from DB i would like to start ~50-100 inputs at time.

Here is my configuration:

@Configuration
public class BatchConfig {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;
    @Autowired
    public StepBuilderFactory stepBuilderFactory;
    @Autowired
    private DatabaseConfig databaseConfig;
    @Value(value = "classpath:Categories.txt")
    private Resource categories;

    @Bean
    public Job processJob() throws Exception {
        return jobBuilderFactory.get("processJob").incrementer(new RunIdIncrementer()).listener(listener()).flow(orderStep1()).end().build();
    }

    @Bean
    public Step orderStep1() throws Exception {
        return stepBuilderFactory.get("orderStep1").<Category, CategoryDailyResult>chunk(1).reader(reader()).processor(asyncItemProcessor()).writer(asyncItemWriter()).taskExecutor(taskExecutor()).build();
    }

    @Bean
    public JobExecutionListener listener() {
        return new JobCompletionListener();
    }


    @Bean
    public ItemWriter asyncItemWriter() {
        AsyncItemWriter<CategoryDailyResult> asyncItemWriter = new AsyncItemWriter<>();
        asyncItemWriter.setDelegate(itemWriter());
        return asyncItemWriter;
    }

    @Bean
    public ItemWriter<CategoryDailyResult> itemWriter(){
        return new Writer();
    }

    @Bean
    public ItemProcessor asyncItemProcessor() {
        AsyncItemProcessor<Category, CategoryDailyResult> asyncItemProcessor = new AsyncItemProcessor<>();
        asyncItemProcessor.setDelegate(itemProcessor());
        asyncItemProcessor.setTaskExecutor(taskExecutor());
        return asyncItemProcessor;
    }

    @Bean
    public ItemProcessor<Category, CategoryDailyResult> itemProcessor(){
        return new Processor();
    }

    @Bean
    public TaskExecutor taskExecutor(){
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        taskExecutor.setConcurrencyLimit(50);
        return taskExecutor;
    }

    @Bean(destroyMethod = "")
    public ItemReader<Category> reader() throws Exception {
        String query = "select c from Category c where not exists elements(c.children)";

        JpaPagingItemReader<Category> reader = new JpaPagingItemReader<>();
        reader.setSaveState(false);
        reader.setQueryString(query);
        reader.setEntityManagerFactory(databaseConfig.entityManagerFactory().getObject());
        reader.setPageSize(1);

        return reader;
    }
}

How can I boost my application? Maybe am I doing something wrong? Any feedback welcome ;)

@Edit: For input of IDs: 1 to 100 I want e.g 50 threads which are executing processor. I want them to not block each other: Thread1 process input "1" for 2 minutes and at this time I want Thread2 to process input "2", "8", "64" which are small and execute in few seconds.

@Edit2: My goal: I have 25k IDs in database, I read them with JpaPagingItemReader and every ID is processed by processor. Each item is independent of each other. For each ID i make SOAP call for 0-100 times in loop and then i create Object which i pass to Writer and save in database. How can I obtain best performance for such task?

like image 246
crooked Avatar asked Aug 18 '17 14:08

crooked


People also ask

Is batch process synchronous or asynchronous?

Yes the batch job is asynchronous. As soon as the batch execute is triggered the flow will move on to the next event processor.

How do you scale a Spring Batch?

Spring Batch can be scaled in four ways: Multi-threaded steps, asyncItem Processor / asyncItem Writer, Partitioning, and Remote Chunking. Multiple jobs can be run concurrently, making the task easier.

What is Throttle limit in Spring Batch?

A TaskExecutor with a throttle limit which works by delegating to an existing task executor and limiting the number of tasks submitted. A throttle limit is provided to limit the number of pending requests over and above the features provided by the other task executors.


1 Answers

You should partition your job. Add a partitioned step like so:

@Bean
public Step partitionedOrderStep1(Step orderStep1) {
    return stepBuilder.get("partitionedOrderStep1")
            .partitioner(orderStep1)
            .partitioner("orderStep1", new SimplePartitioner())
            .taskExecutor(taskExecutor())
            .gridSize(10)  //Number of concurrent partitions
            .build();
}

Then use that Step in your Job definition. The .gridSize() call configures the number of partitions to be concurrently executed. If any of your Reader, Processor, or Writer objects are stateful you need to annotate them with @StepScope.

like image 149
Joe Chiavaroli Avatar answered Oct 08 '22 20:10

Joe Chiavaroli