Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to run multiple jobs in spring batch using annotations

I am using Spring Boot + Spring Batch (annotation) , have come across a scenario where I have to run 2 jobs.

I have Employee and Salary records which needs to updated using spring batch. I have configured BatchConiguration classes by following this tutorial spring-batch getting started tutorial for Employee and Salary objects, respectively named as BatchConfigurationEmployee & BatchConfigurationSalary.

I have Defined the ItemReader, ItemProcessor, ItemWriter and Job by following the tutorial which is mentioned above already.

When I start my Spring Boot application either of the Job runs, I want to run both the BatchConfigured classes. How can I achieve this

********* BatchConfigurationEmployee.java *************

@Configuration
@EnableBatchProcessing
public class BatchConfigurationEmployee {
    public ItemReader<employee> reader() {
        return new EmployeeItemReader();
    }

    @Bean
    public ItemProcessor<Employee, Employee> processor() {
        return new EmployeeItemProcessor();
    }

    @Bean   
    public Job Employee(JobBuilderFactory jobs, Step s1) {
        return jobs.get("Employee")
                .incrementer(new RunIdIncrementer())
                .flow(s1)
                .end()
                .build();
    }

    @Bean
    public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Employee> reader,
                    ItemProcessor<Employee, Employee> processor) {
        return stepBuilderFactory.get("step1")
                .<Employee, Employee> chunk(1)
                .reader(reader)
                .processor(processor)
                .build();
    }
}

Salary Class is here

@Configuration
@EnableBatchProcessing
public class BatchConfigurationSalary {
    public ItemReader<Salary> reader() {
        return new SalaryItemReader();
    }

    @Bean
    public ItemProcessor<Salary, Salary> processor() {
        return new SalaryItemProcessor();
    }

    @Bean
    public Job salary(JobBuilderFactory jobs, Step s1) {
        return jobs.get("Salary")
                .incrementer(new RunIdIncrementer())
                .flow(s1)
                .end()
                .build();
    }

    @Bean
    public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Salary> reader,
                    ItemProcessor<Salary, Salary> processor) {
        return stepBuilderFactory.get("step1")
                .<Salary, Salary> chunk(1)
                .reader(reader)
                .processor(processor)
                .build();
    }
}
like image 242
newstackoverflowuser5555 Avatar asked Nov 29 '15 22:11

newstackoverflowuser5555


People also ask

Can a Spring Batch have multiple jobs?

Multiple jobs can be run simultaneously. There are two main types of Spring Batch Parallel Processing: Single Process, Multi-threaded, or Multi-process. These are also divided into subcategories, as follows: Multi-threaded Step (Step with many threads, single process)

How do I read multiple files in Spring Batch?

You need to use MultiResourceItemReader to read lines from CSV file. It reads items from multiple resources sequentially. FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>(); //Set number of lines to skips.

What is JobExplorer in Spring Batch?

public interface JobExplorer. Entry point for browsing executions of running or historical jobs and steps. Since the data may be re-hydrated from persistent storage, it may not contain volatile fields that would have been present when the execution was active.


2 Answers

You jobs are not annotated with @Bean, so the spring-context doesn't know them.

Have a look at the class JobLauncherCommandLineRunner. All Beans in the SpringContext implementing the Job interface will be injected. All jobs that are found will be executed. (this happens inside the method executeLocalJobs in JobLauncherCommandLineRunner)

If, for some reason, you don't want to have them as beans in the context, then you have to register your jobs with the jobregistry.( the method execute registeredJobs of JobLauncherCommandLineRunner will take care of launching the registered jobs)

BTW, you can control with the property

spring.batch.job.names= # Comma-separated list of job names to execute on startup (For instance
 `job1,job2`). By default, all Jobs found in the context are executed.

which jobs should be launched.

like image 137
Hansjoerg Wingeier Avatar answered Oct 13 '22 04:10

Hansjoerg Wingeier


I feel that this also is a pretty good way to run mutiple Jobs.

I am making use of a Job Launcher to configure and execute the job and independent commandLineRunner implementation to run them. These are ordered to make sure they are executed sequentially in the required though

Apologies for the big post but I wanted to give a clear picture of what can be achieved using JobLauncher configurations with multiple command line runners

This is the current BeanConfiguration that I have

@Configuration
public class BeanConfiguration {

    @Autowired
    DataSource dataSource;

    @Autowired
    PlatformTransactionManager transactionManager;

    @Bean(name="jobOperator")
     public JobOperator jobOperator(JobExplorer jobExplorer,

                                    JobRegistry jobRegistry) throws Exception {

            SimpleJobOperator jobOperator = new SimpleJobOperator();

            jobOperator.setJobExplorer(jobExplorer);
            jobOperator.setJobRepository(createJobRepository());
            jobOperator.setJobRegistry(jobRegistry);
            jobOperator.setJobLauncher(jobLauncher());

            return jobOperator;
     }

    /**
     * Configure joblaucnher to set the execution to be done asycn
     * Using the ThreadPoolTaskExecutor
     * @return
     * @throws Exception
     */
    @Bean
    public JobLauncher jobLauncher() throws Exception {
            SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
            jobLauncher.setJobRepository(createJobRepository());
            jobLauncher.setTaskExecutor(taskExecutor());
            jobLauncher.afterPropertiesSet();
            return jobLauncher;
    }

    // Read the datasource and set in the job repo
    protected JobRepository createJobRepository() throws Exception {
        JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
        factory.setDataSource(dataSource);
        factory.setTransactionManager(transactionManager);
        factory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
        //factory.setTablePrefix("BATCH_");
        factory.setMaxVarCharLength(10000);
        return factory.getObject();
    }

    @Bean
    public RestTemplateBuilder restTemplateBuilder() {
     return new RestTemplateBuilder().additionalInterceptors(new CustomRestTemplateLoggerInterceptor());
    }

    @Bean(name=AppConstants.JOB_DECIDER_BEAN_NAME_EMAIL_INIT)
    public JobExecutionDecider jobDecider() {
        return new EmailInitJobExecutionDecider();
    }

    @Bean
    public ThreadPoolTaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(15);
    taskExecutor.setMaxPoolSize(20);
    taskExecutor.setQueueCapacity(30);
    return taskExecutor;
}
}

I have setup the database to hold the job exectuion details in postgre and hence the DatabaseConfiguration looks like this (two different beans for two different profiles -env)

@Configuration public class DatasourceConfiguration implements EnvironmentAware{

private Environment env;

@Bean
@Qualifier(AppConstants.DB_BEAN)
@Profile("dev")
public DataSource getDataSource() {
    HikariDataSource ds = new HikariDataSource();

    boolean isAutoCommitEnabled = env.getProperty("spring.datasource.hikari.auto-commit") != null ? Boolean.parseBoolean(env.getProperty("spring.datasource.hikari.auto-commit")):false;
    ds.setAutoCommit(isAutoCommitEnabled);
    // Connection test query is for legacy connections
    //ds.setConnectionInitSql(env.getProperty("spring.datasource.hikari.connection-test-query"));
    ds.setPoolName(env.getProperty("spring.datasource.hikari.pool-name"));
    ds.setDriverClassName(env.getProperty("spring.datasource.driver-class-name"));
    long timeout = env.getProperty("spring.datasource.hikari.idleTimeout") != null ? Long.parseLong(env.getProperty("spring.datasource.hikari.idleTimeout")): 40000;
    ds.setIdleTimeout(timeout);
    long maxLifeTime = env.getProperty("spring.datasource.hikari.maxLifetime") != null ? Long.parseLong(env.getProperty("spring.datasource.hikari.maxLifetime")): 1800000 ;
    ds.setMaxLifetime(maxLifeTime);
    ds.setJdbcUrl(env.getProperty("spring.datasource.url"));
    ds.setPoolName(env.getProperty("spring.datasource.hikari.pool-name"));
    ds.setUsername(env.getProperty("spring.datasource.username"));
    ds.setPassword(env.getProperty("spring.datasource.password"));
    int poolSize = env.getProperty("spring.datasource.hikari.maximum-pool-size") != null ? Integer.parseInt(env.getProperty("spring.datasource.hikari.maximum-pool-size")): 10;
    ds.setMaximumPoolSize(poolSize);

    return ds;
}

@Bean
@Qualifier(AppConstants.DB_PROD_BEAN)
@Profile("prod")

public DataSource getProdDatabase() {
    HikariDataSource ds = new HikariDataSource();

    boolean isAutoCommitEnabled = env.getProperty("spring.datasource.hikari.auto-commit") != null ? Boolean.parseBoolean(env.getProperty("spring.datasource.hikari.auto-commit")):false;
    ds.setAutoCommit(isAutoCommitEnabled);
    // Connection test query is for legacy connections
    //ds.setConnectionInitSql(env.getProperty("spring.datasource.hikari.connection-test-query"));
    ds.setPoolName(env.getProperty("spring.datasource.hikari.pool-name"));
    ds.setDriverClassName(env.getProperty("spring.datasource.driver-class-name"));
    long timeout = env.getProperty("spring.datasource.hikari.idleTimeout") != null ? Long.parseLong(env.getProperty("spring.datasource.hikari.idleTimeout")): 40000;
    ds.setIdleTimeout(timeout);
    long maxLifeTime = env.getProperty("spring.datasource.hikari.maxLifetime") != null ? Long.parseLong(env.getProperty("spring.datasource.hikari.maxLifetime")): 1800000 ;
    ds.setMaxLifetime(maxLifeTime);
    ds.setJdbcUrl(env.getProperty("spring.datasource.url"));
    ds.setPoolName(env.getProperty("spring.datasource.hikari.pool-name"));
    ds.setUsername(env.getProperty("spring.datasource.username"));
    ds.setPassword(env.getProperty("spring.datasource.password"));
    int poolSize = env.getProperty("spring.datasource.hikari.maximum-pool-size") != null ? Integer.parseInt(env.getProperty("spring.datasource.hikari.maximum-pool-size")): 10;
    ds.setMaximumPoolSize(poolSize);

    return ds;
}

public void setEnvironment(Environment environment) {
    // TODO Auto-generated method stub
    this.env = environment;
}

}

Make sure that the initial app launcher catches the app execution which will be returned once the job execution terminates (either gets failed or completed) so that you can gracefully shutdown the jvm. Else using joblauncher makes the jvm to be alive even after all jobs get completed

@SpringBootApplication
@ComponentScan(basePackages="com.XXXX.Feedback_File_Processing.*")
@EnableBatchProcessing
public class FeedbackFileProcessingApp 
{
    public static void main(String[] args) throws Exception {
        ApplicationContext appContext = SpringApplication.run(FeedbackFileProcessingApp.class, args);
        // The batch job has finished by this point because the 
        //   ApplicationContext is not 'ready' until the job is finished
        // Also, use System.exit to force the Java process to finish with the exit code returned from the Spring App
        System.exit(SpringApplication.exit(appContext));
    }

}

............. so on , you can configure your own decider , your own job/steps as you said above for two different configurations like below and use them seperately in commandline runners (since the post is getting bigger, I am giving the details of just the job and command line runner)

These are the two jobs

@Configuration
public class DefferalJobConfiguration {

    @Autowired
    JobLauncher joblauncher;

    @Autowired
    private JobBuilderFactory jobFactory;

    @Autowired
    private StepBuilderFactory stepFactory;

    @Bean
    @StepScope
    public Tasklet newSampleTasklet() {
        return ((stepExecution, chunkContext) -> {
            System.out.println("execution of step after flow");
            return RepeatStatus.FINISHED;
        });
    }

    @Bean
    public Step sampleStep() {
        return stepFactory.get("sampleStep").listener(new CustomStepExecutionListener())
                .tasklet(newSampleTasklet()).build();
    }

    @Autowired
    @Qualifier(AppConstants.FLOW_BEAN_NAME_EMAIL_INITIATION)
    private Flow emailInitFlow;

    @Autowired
    @Qualifier(AppConstants.JOB_DECIDER_BEAN_NAME_EMAIL_INIT)
    private JobExecutionDecider jobDecider;

    @Autowired
    @Qualifier(AppConstants.STEP_BEAN_NAME_ITEMREADER_FETCH_DEFERRAL_CONFIG)
    private Step deferralConfigStep;

    @Bean(name=AppConstants.JOB_BEAN_NAME_DEFERRAL)
    public Job deferralJob() {
        return jobFactory.get(AppConstants.JOB_NAME_DEFERRAL)
                .start(emailInitFlow)
                .on("COMPLETED").to(sampleStep())
                .next(jobDecider).on("COMPLETED").to(deferralConfigStep)
                .on("FAILED").fail()
                .end().build();


    }
}



@Configuration
public class TestFlowJobConfiguration {

    @Autowired
    private JobBuilderFactory jobFactory;

    @Autowired
    @Qualifier("testFlow")
    private Flow testFlow;

    @Bean(name = "testFlowJob")
    public Job testFlowJob() {

        return jobFactory.get("testFlowJob").start(testFlow).end().build();
    }
}

Here are the command line runners (I am making sure that the first job is completed before the second job is initialized but it is totally up to the user to execute them in parallel following a different stratergy)

@Component
@Order(1)
public class DeferralCommandLineRunner implements CommandLineRunner, EnvironmentAware{
    // If the jobLauncher is not used, then by default jobs are launched using SimpleJobLauncher
    //  with default configuration(assumption)
    // hence modified the jobLauncher with vales set in BeanConfig
    // of spring batch
    private Environment env;

    @Autowired
    JobLauncher jobLauncher;

    @Autowired
    @Qualifier(AppConstants.JOB_BEAN_NAME_DEFERRAL)
    Job deferralJob;

    @Override
    public void run(String... args) throws Exception {
        // TODO Auto-generated method stub
        JobParameters jobparams = new JobParametersBuilder()
                .addString("run.time", LocalDateTime.now().
                        format(DateTimeFormatter.ofPattern(AppConstants.JOB_DATE_FORMATTER_PATTERN)).toString())
                .addString("instance.name", 
                        (deferralJob.getName() != null) ?deferralJob.getName()+'-'+UUID.randomUUID().toString() :
                            UUID.randomUUID().toString())
                .toJobParameters();
        jobLauncher.run(deferralJob, jobparams);
    }

    @Override
    public void setEnvironment(Environment environment) {
        // TODO Auto-generated method stub
        this.env = environment;
    }

}



@Component
@Order(2)
public class TestJobCommandLineRunner implements CommandLineRunner {

    @Autowired
    JobLauncher jobLauncher;

    @Autowired
    @Qualifier("testFlowJob")
    Job testjob;

    @Autowired
    @Qualifier("jobOperator")
    JobOperator operator;

    @Override
    public void run(String... args) throws Exception {
        // TODO Auto-generated method stub
        JobParameters jobParam = new JobParametersBuilder().addString("name", UUID.randomUUID().toString())
                .toJobParameters();
        System.out.println(operator.getJobNames());
        try {
            Set<Long> deferralExecutionIds = operator.getRunningExecutions(AppConstants.JOB_NAME_DEFERRAL);
            System.out.println("deferralExceutuibuds:" + deferralExecutionIds);

            operator.stop(deferralExecutionIds.iterator().next());

        } catch (NoSuchJobException | NoSuchJobExecutionException | JobExecutionNotRunningException e) {
            // just add a logging here
            System.out.println("exception caught:" + e.getMessage());
        }
        jobLauncher.run(testjob, jobParam);
    }

}

Hope this gives a complete idea of how it can be done. I am using spring-boot-starter-batch:jar:2.0.0.RELEASE

like image 2
vijayakumarpsg587 Avatar answered Oct 13 '22 05:10

vijayakumarpsg587