I have a problem with creating asynchronous processor in Spring Batch.
My processor is getting ID
from reader
and creating object based on response from SOAP
call. Sometimes for 1 input (ID
) there must be e.g. 60-100 SOAP
calls and sometimes just 1. I tried to make multithreaded step it was processing e.g 50 inputs at time but it was useless because 49 threads done their job in 1 second and were blocked, waiting for this one which was doing 60-100 SOAP
calls. Now i use AsyncItemProcessor
+AsyncItemWriter
but this solution works slowly for me. As my input (IDs
) is large, around 25k items read from DB i would like to start ~50-100 inputs at time.
Here is my configuration:
@Configuration
public class BatchConfig {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
private DatabaseConfig databaseConfig;
@Value(value = "classpath:Categories.txt")
private Resource categories;
@Bean
public Job processJob() throws Exception {
return jobBuilderFactory.get("processJob").incrementer(new RunIdIncrementer()).listener(listener()).flow(orderStep1()).end().build();
}
@Bean
public Step orderStep1() throws Exception {
return stepBuilderFactory.get("orderStep1").<Category, CategoryDailyResult>chunk(1).reader(reader()).processor(asyncItemProcessor()).writer(asyncItemWriter()).taskExecutor(taskExecutor()).build();
}
@Bean
public JobExecutionListener listener() {
return new JobCompletionListener();
}
@Bean
public ItemWriter asyncItemWriter() {
AsyncItemWriter<CategoryDailyResult> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(itemWriter());
return asyncItemWriter;
}
@Bean
public ItemWriter<CategoryDailyResult> itemWriter(){
return new Writer();
}
@Bean
public ItemProcessor asyncItemProcessor() {
AsyncItemProcessor<Category, CategoryDailyResult> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(itemProcessor());
asyncItemProcessor.setTaskExecutor(taskExecutor());
return asyncItemProcessor;
}
@Bean
public ItemProcessor<Category, CategoryDailyResult> itemProcessor(){
return new Processor();
}
@Bean
public TaskExecutor taskExecutor(){
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(50);
return taskExecutor;
}
@Bean(destroyMethod = "")
public ItemReader<Category> reader() throws Exception {
String query = "select c from Category c where not exists elements(c.children)";
JpaPagingItemReader<Category> reader = new JpaPagingItemReader<>();
reader.setSaveState(false);
reader.setQueryString(query);
reader.setEntityManagerFactory(databaseConfig.entityManagerFactory().getObject());
reader.setPageSize(1);
return reader;
}
}
How can I boost my application? Maybe am I doing something wrong? Any feedback welcome ;)
@Edit: For input of IDs: 1 to 100 I want e.g 50 threads which are executing processor. I want them to not block each other: Thread1 process input "1" for 2 minutes and at this time I want Thread2 to process input "2", "8", "64" which are small and execute in few seconds.
@Edit2:
My goal:
I have 25k IDs in database, I read them with JpaPagingItemReader
and every ID is processed by processor. Each item is independent of each other. For each ID i make SOAP
call for 0-100 times in loop and then i create Object which i pass to Writer
and save in database. How can I obtain best performance for such task?
Yes the batch job is asynchronous. As soon as the batch execute is triggered the flow will move on to the next event processor.
Spring Batch can be scaled in four ways: Multi-threaded steps, asyncItem Processor / asyncItem Writer, Partitioning, and Remote Chunking. Multiple jobs can be run concurrently, making the task easier.
A TaskExecutor with a throttle limit which works by delegating to an existing task executor and limiting the number of tasks submitted. A throttle limit is provided to limit the number of pending requests over and above the features provided by the other task executors.
You should partition your job. Add a partitioned step like so:
@Bean
public Step partitionedOrderStep1(Step orderStep1) {
return stepBuilder.get("partitionedOrderStep1")
.partitioner(orderStep1)
.partitioner("orderStep1", new SimplePartitioner())
.taskExecutor(taskExecutor())
.gridSize(10) //Number of concurrent partitions
.build();
}
Then use that Step in your Job definition. The .gridSize() call configures the number of partitions to be concurrently executed. If any of your Reader, Processor, or Writer objects are stateful you need to annotate them with @StepScope.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With