Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to configure kafka transaction manager with spring transaction in spring boot app

I am using Kafka in the spring boot application. I want to perform operations in one transaction like given below.

listen(){
 produce()
 saveInDb()
} 

and

operation(){
 saveInDB()
 produce()
}

I have enabled Kafka transactions using the below configurations

spring:
  kafka:
    bootstrap-servers: localhost:19092,localhost:29092,localhost:39092
    producer:
      transaction-id-prefix: tx-
    consumer:
      enable-auto-commit: false
      isolation-level: read_committed

and using this configuration

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        DefaultKafkaProducerFactory<String, Object> factory = new DefaultKafkaProducerFactory<>(props);
        factory.setTransactionIdPrefix("tx-");
        return factory;
    }

    @Bean
    public KafkaTransactionManager kafkaTransactionManager() {
        KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory());
        return manager;
    }
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

But I am getting an error when tried with spring @Transactional annotation

@Transactional
operation(){
 saveInDB()
 produce()
} 
No bean named 'transactionManager' available: No matching TransactionManager bean found for qualifier 'transactionManager' - neither qualifier match nor bean name match!

I followed the spring docs here https://docs.spring.io/spring-kafka/reference/html/#using-kafkatransactionmanager

What am I missing in the configuration?

like image 614
Faizan Avatar asked Nov 03 '25 17:11

Faizan


1 Answers

I was missing defining transactionManager bean in configuration. Spring was not able to find it because KafkaTransactionManager extends AbstractPlatformTransactionManager and JpaTransactionManager also extends the same class.

Defining this bean as the primary bean fixed the issue.

    @Bean
    @Primary
    public JpaTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) {
        return new JpaTransactionManager(entityManagerFactory);
    }

Kafka transactions get chained with it and KafkaTemplate will synchronize a transaction with transaction manager.

Reference https://docs.spring.io/spring-kafka/reference/html/#transactions

like image 56
Faizan Avatar answered Nov 05 '25 08:11

Faizan



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!