I am new to Spring batch and couldn't figure out how to do this..
Basically I have a spring file poller which runs every N mins to look for files with some name (ex: A.txt & B.txt) in certain directory. At any moment in time, there could be max 2 files in this directory (A and B). Through Spring Batch Job, these two files will be processed and persisted to 2 different DB tables.
These files are somewhat similar, so the same processor/writer is used.
Right now the way I set up, every polling cycle 1 file is picked up and job is ran.
Let's say there are 2 files in the directory (A.txt and B.txt), is there a way to create 2 jobs so that both jobs can be run in parallel?
1) Multi-Threaded Steps Spring Batch Parallel Processing is each chunk in its own thread by adding a task executor to the step. If there are a million records to process and each chunk is 1000 records, and the task executor exposes four threads, you can handle 4000 records in parallel instead of 1000 records.
When you are ready to start implementing a job with some parallel processing, Spring Batch offers a range of options, which are described in this chapter, although some features are covered elsewhere. At a high level, there are two modes of parallel processing: Single process, multi-threaded. Multi-process.
getBean("jobLauncher"); or do Autowired for this JobLauncher bean in main class and launch specific jobs sequentially in specific sequential order by invoking , jobLauncher. run(job, jobParameters) . You can get specific job instances from context initialized at step # 2.
Multithreaded steps. By default, Spring Batch uses the same thread to execute a batch job from start to finish, meaning that everything runs sequentially. Spring Batch also allows multithreading at the step level. This makes it possible to process chunks using several threads.
There are very good approaches in order to run jobs in async mode with Spring, it is just a matter of how is configured the JobLauncher
. The JobLauncher
has a taskExecutor
property and the asynchronous execution could be activated depending on the implementation that is assigned to that property.
You can find all the TaskExecutor
types that Spring can provide and depending on your needs select the best approach to accomplish your batch asynchronous jobs. Task Executors Types in Spring
For example SimpleAsyncTaskExecutor
is a task executor that will create a new Thread
on any invocation and that could generate a performance issue if the execution runs with high frequency. In the other hand there are also TaskExecutors
types that provides pooling features in order to reuse resources and maximize the efficiency of the system.
Here is an small example of how configure a ThreadPoolTaskExecutor
:
A) Configure ThreadPoolTaskExecutor Bean
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(15);
taskExecutor.setMaxPoolSize(20);
taskExecutor.setQueueCapacity(30);
return taskExecutor;
}
B) Configure JobLauncher Bean
@Bean
public JobLauncher jobLauncher(ThreadPoolTaskExecutor taskExecutor, JobRepository jobRepository){
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setTaskExecutor(taskExecutor);
jobLauncher.setJobRepository(jobRepository);
return jobLauncher;
}
C) Inject your JobLauncher
and your Jobs
configuration
@Autowired
private JobLauncher jobLauncher;
@Autowired
@Qualifier("job1-file-A")
private Job job1;
@Autowired
@Qualifier("job2-file-B")
private Job job2;
D) Schedule the jobs
@Scheduled(cron = "*/1 * * * * *")
public void run1(){
Map<String, JobParameter> confMap = new HashMap<>();
confMap.put("time", new JobParameter(System.currentTimeMillis()));
JobParameters jobParameters = new JobParameters(confMap);
try {
jobLauncher.run(job1, jobParameters);
}catch (Exception ex){
logger.error(ex.getMessage());
}
}
@Scheduled(cron = "*/1 * * * * *")
public void run2(){
Map<String, JobParameter> confMap = new HashMap<>();
confMap.put("time", new JobParameter(System.currentTimeMillis()));
JobParameters jobParameters = new JobParameters(confMap);
try {
jobLauncher.run(job2, jobParameters);
}catch (Exception ex){
logger.error(ex.getMessage());
}
}
E) Finally on your SpringBoot Class @EnableBatchProcessing
and @EnableScheduling
@EnableBatchProcessing
@EnableScheduling
@SpringBootApplication
public class MyBatchApp {
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With