My main job does only read operations and the other one does some writing but on MyISAM engine
which ignores transactions, so I wouldn't require necessarily transaction support. How can I configure Spring Batch
to have its own datasource for the JobRepository
, separate from the one holding the business data? The initial one datasource-configurations is done like the following:
@Configuration public class StandaloneInfrastructureConfiguration { @Autowired Environment env; @Bean public LocalContainerEntityManagerFactoryBean entityManagerFactory() { LocalContainerEntityManagerFactoryBean em = new LocalContainerEntityManagerFactoryBean(); em.setDataSource(dataSource()); em.setPackagesToScan(new String[] { "org.podcastpedia.batch.*" }); JpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter(); em.setJpaVendorAdapter(vendorAdapter); em.setJpaProperties(additionalJpaProperties()); return em; } Properties additionalJpaProperties() { Properties properties = new Properties(); properties.setProperty("hibernate.hbm2ddl.auto", "none"); properties.setProperty("hibernate.dialect", "org.hibernate.dialect.MySQL5Dialect"); properties.setProperty("hibernate.show_sql", "true"); return properties; } @Bean public DataSource dataSource(){ return DataSourceBuilder.create() .url(env.getProperty("db.url")) .driverClassName(env.getProperty("db.driver")) .username(env.getProperty("db.username")) .password(env.getProperty("db.password")) .build(); } @Bean public PlatformTransactionManager transactionManager(EntityManagerFactory emf){ JpaTransactionManager transactionManager = new JpaTransactionManager(); transactionManager.setEntityManagerFactory(emf); return transactionManager; } }
and then it is imported in the Job
's configuration class where the @EnableBatchProcessing
annotation automagically makes use of it. My initial thought was to try to set the configuration class extend the DefaultBatchConfigurer
, but then I get a
BeanCurrentlyInCreationException ( org.springframework.beans.factory.BeanCurrentlyInCreationException: Error creating bean with name jobBuilders: Requested bean is currently in creation: Is there an unresolvable circular reference?):
@Configuration @EnableBatchProcessing @Import({StandaloneInfrastructureConfiguration.class, NotifySubscribersServicesConfiguration.class}) public class NotifySubscribersJobConfiguration extends DefaultBatchConfigurer { @Autowired private JobBuilderFactory jobBuilders; @Autowired private StepBuilderFactory stepBuilders; @Autowired private DataSource dataSource; @Autowired Environment env; @Override @Autowired public void setDataSource(javax.sql.DataSource dataSource) { super.setDataSource(batchDataSource()); } private DataSource batchDataSource(){ return DataSourceBuilder.create() .url(env.getProperty("batchdb.url")) .driverClassName(env.getProperty("batchdb.driver")) .username(env.getProperty("batchdb.username")) .password(env.getProperty("batchdb.password")) .build(); } @Bean public ItemReader<User> notifySubscribersReader(){ JdbcCursorItemReader<User> reader = new JdbcCursorItemReader<User>(); String sql = "select * from users where is_email_subscriber is not null"; reader.setSql(sql); reader.setDataSource(dataSource); reader.setRowMapper(rowMapper()); return reader; } ........ }
Any thoughts are more than welcomed. The project is available on GitHub - https://github.com/podcastpedia/podcastpedia-batch
Thanks a bunch.
With the xml one you have to be explicit in which datasource Spring Batch uses. If you don't declare it explicitly with Java based configuration it will try to detect the datasource to work, which will only work in case a single datasource is detected. YOu could try annotating the one to use for Batch with @Primary .
Using skip and skipLimit. First of all, to enable skip functionality, we need to include a call to faultTolerant() during the step-building process. Within skip() and skipLimit(), we define the exceptions we want to skip and the maximum number of skipped items.
read an ASCII file with fixed column length values sent by a third-party with previously specified layout (STEP 1 reader) validate the read values and register (Log file) the errors (custom messages) Apply some business logic on the processor to filter any undesirable lines (STEP 1 processor)
Spring Cloud Data Flow is a cloud-native toolkit for building real-time data pipelines and batch processes. Spring Cloud Data Flow is ready to be used for a range of data processing use cases like simple import/export, ETL processing, event streaming, and predictive analytics.
Ok, this is strange but it works. Moving the datasources to it's own configuration class works just fine and one is able to autowire.
The example is a multi-datasource version of Spring Batch Service Example:
DataSourceConfiguration:
public class DataSourceConfiguration { @Value("classpath:schema-mysql.sql") private Resource schemaScript; @Bean @Primary public DataSource hsqldbDataSource() throws SQLException { final SimpleDriverDataSource dataSource = new SimpleDriverDataSource(); dataSource.setDriver(new org.hsqldb.jdbcDriver()); dataSource.setUrl("jdbc:hsqldb:mem:mydb"); dataSource.setUsername("sa"); dataSource.setPassword(""); return dataSource; } @Bean public JdbcTemplate jdbcTemplate(final DataSource dataSource) { return new JdbcTemplate(dataSource); } @Bean public DataSource mysqlDataSource() throws SQLException { final SimpleDriverDataSource dataSource = new SimpleDriverDataSource(); dataSource.setDriver(new com.mysql.jdbc.Driver()); dataSource.setUrl("jdbc:mysql://localhost/spring_batch_example"); dataSource.setUsername("test"); dataSource.setPassword("test"); DatabasePopulatorUtils.execute(databasePopulator(), dataSource); return dataSource; } @Bean public JdbcTemplate mysqlJdbcTemplate(@Qualifier("mysqlDataSource") final DataSource dataSource) { return new JdbcTemplate(dataSource); } private DatabasePopulator databasePopulator() { final ResourceDatabasePopulator populator = new ResourceDatabasePopulator(); populator.addScript(schemaScript); return populator; } }
BatchConfiguration:
@Configuration @EnableBatchProcessing @Import({ DataSourceConfiguration.class, MBeanExporterConfig.class }) public class BatchConfiguration { @Autowired private JobBuilderFactory jobs; @Autowired private StepBuilderFactory steps; @Bean public ItemReader<Person> reader() { final FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>(); reader.setResource(new ClassPathResource("sample-data.csv")); reader.setLineMapper(new DefaultLineMapper<Person>() { { setLineTokenizer(new DelimitedLineTokenizer() { { setNames(new String[] { "firstName", "lastName" }); } }); setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() { { setTargetType(Person.class); } }); } }); return reader; } @Bean public ItemProcessor<Person, Person> processor() { return new PersonItemProcessor(); } @Bean public ItemWriter<Person> writer(@Qualifier("mysqlDataSource") final DataSource dataSource) { final JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>(); writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>()); writer.setSql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)"); writer.setDataSource(dataSource); return writer; } @Bean public Job importUserJob(final Step s1) { return jobs.get("importUserJob").incrementer(new RunIdIncrementer()).flow(s1).end().build(); } @Bean public Step step1(final ItemReader<Person> reader, final ItemWriter<Person> writer, final ItemProcessor<Person, Person> processor) { return steps.get("step1") .<Person, Person> chunk(1) .reader(reader) .processor(processor) .writer(writer) .build(); } }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With