Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Persist issue with a spring batch ItemWriter using a JPA repository

I have an issue with a spring batch ItemWriter that relies on a JPA repository in order to update data.

Here it is:

@Component
public class MessagesDigestMailerItemWriter implements ItemWriter<UserAccount> {

    private static final Logger log = LoggerFactory.getLogger(MessagesDigestMailerItemWriter.class);

    @Autowired
    private MessageRepository messageRepository;

    @Autowired
    private MailerService mailerService;

    @Override
    public void write(List<? extends UserAccount> userAccounts) throws Exception {
        log.info("Mailing messages digests and updating messages notification statuses");

        for (UserAccount userAccount : userAccounts) {
            if (userAccount.isEmailNotification()) {
                mailerService.mailMessagesDigest(userAccount);
            }
            for (Message message : userAccount.getReceivedMessages()) {
                message.setNotificationSent(true);
                messageRepository.save(message);//NOT SAVING!!
            }
        }
    }
}

Here is my Step configuration:

@Configuration
public class MailStepConfiguration {

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private EntityManagerFactory entityManagerFactory;

    @Autowired
    private MessagesDigestMailerItemWriter itemWriter;

    @Bean
    public Step messagesDigestMailingStep() {
        return stepBuilderFactory.get("messagesDigestMailingStep")//
                .<UserAccount, UserAccount> chunk(1)//
                .reader(jpaPagingItemReader(entityManagerFactory))//
                .writer(itemWriter)//
                .build();
    }

    @Bean(destroyMethod = "")
    @StepScope
    public static ItemReader<UserAccount> jpaPagingItemReader(EntityManagerFactory entityManagerFactory) {
        final JpaPagingItemReader<UserAccount> reader = new JpaPagingItemReader<>();
        reader.setEntityManagerFactory(entityManagerFactory);
        reader.setQueryString("SELECT ua FROM UserAccount ua JOIN FETCH ua.receivedMessages msg WHERE msg.notificationSent = false AND msg.messageRead = false");
        return reader;
    }

}

For completeness sake, here is my spring boot configuration:

@Configuration
@EnableBatchProcessing
@EnableAutoConfiguration
@ComponentScan("com.bignibou.batch.configuration")
public class Batch {
    public static void main(String[] args) {
        System.exit(SpringApplication.exit(new SpringApplicationBuilder(Batch.class).web(false).run(args)));
    }
}

and my datasource config:

@Configuration
@EnableJpaRepositories({ "com.bignibou.repository" })
@EntityScan("com.bignibou.domain")
public class DatasourceConfiguration {

    @Bean
    @ConfigurationProperties("spring.datasource.batch")
    public DataSource batchDatasource() {
        return DataSourceBuilder.create().build();
    }

    @Bean
    @Primary
    @ConfigurationProperties("spring.datasource.application")
    public DataSource applicationDatasource() {
        return DataSourceBuilder.create().build();
    }
}

I noticed that the execution flow get into the ItemWriter's write method and the messageRepository.save(message); does get executed but the data is not updated.

I suspect it is a transaction issue but I am not sure how to solve this problem...

edit: I forgot to mention that I have two Postgres databases:

  1. one for the job repository data
  2. another one for the application data.

I can confirm that data is written into the job repository database. The issue is with the application data. I am required to use distributed transactions bearing in mind the fact that I have two PG databases?

like image 851
balteo Avatar asked Jul 09 '16 23:07

balteo


People also ask

Can we use JPA in spring batch?

Spring boot batch reads table data from the source database using jpa in item reader, transforms it to destination table format in the item processor and stores the data in another database table.

What is ItemWriter in Spring Batch?

ItemWriter. It is the element of the step of a batch process which writes data. An ItemWriter writes one item a time. Spring Batch provides an Interface ItemWriter. All the writers implement this interface.

What is Spring data JPA which problem does it solve?

JPA handles most of the complexity of JDBC-based database access and object-relational mappings. On top of that, Spring Data JPA reduces the amount of boilerplate code required by JPA. That makes the implementation of your persistence layer easier and faster.

Which is the correct implementation of ItemWriter interface?

FlatFileItemWriter is the ItemWriter implementation provided to generate text file output. Similar to FlatFileItemReader in many respects, this class addresses the issues with file-based output in Java with a clean, consistent interface for you to use.


1 Answers

I opened an issue for this here:

https://jira.spring.io/browse/BATCH-2642

In principle, what helped us was to configure primary transaction manager like so:

@Configuration
public class JpaConfig {

    private final DataSource dataSource;

    @Autowired
    public JpaConfig(@Qualifier("dataSource") DataSource dataSource) {
        this.dataSource = dataSource;
    }

    @Bean
    @Primary
    public JpaTransactionManager jpaTransactionManager() {
        final JpaTransactionManager transactionManager = new JpaTransactionManager();
        transactionManager.setDataSource(dataSource);
        return transactionManager;
    }

}

And then using autowired instance of transaction manager when configuring the step, like so:

@Autowired
private PlatformTransactionManager transactionManager;

private TaskletStep buildTaskletStep() {
        return stepBuilderFactory.get("SendCampaignStep")
                    .<UserAccount, UserAccount>chunk(pushServiceConfiguration.getCampaignBatchSize())
                    .reader(userAccountItemReader)
                    .processor(userAccountItemProcessor)
                    .writer(userAccountItemWriter)
                    .transactionManager(transactionManager)
                    .build();
    }
}

Data is now correctly persisted, but there is still some magic I don't fully get...

like image 199
Petr Dvořák Avatar answered Oct 20 '22 09:10

Petr Dvořák