I am using Kafka in the spring boot application. I want to perform operations in one transaction like given below.
listen(){
produce()
saveInDb()
}
and
operation(){
saveInDB()
produce()
}
I have enabled Kafka transactions using the below configurations
spring:
kafka:
bootstrap-servers: localhost:19092,localhost:29092,localhost:39092
producer:
transaction-id-prefix: tx-
consumer:
enable-auto-commit: false
isolation-level: read_committed
and using this configuration
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
DefaultKafkaProducerFactory<String, Object> factory = new DefaultKafkaProducerFactory<>(props);
factory.setTransactionIdPrefix("tx-");
return factory;
}
@Bean
public KafkaTransactionManager kafkaTransactionManager() {
KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory());
return manager;
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
But I am getting an error when tried with spring @Transactional annotation
@Transactional
operation(){
saveInDB()
produce()
}
No bean named 'transactionManager' available: No matching TransactionManager bean found for qualifier 'transactionManager' - neither qualifier match nor bean name match!
I followed the spring docs here https://docs.spring.io/spring-kafka/reference/html/#using-kafkatransactionmanager
What am I missing in the configuration?
I was missing defining transactionManager bean in configuration.
Spring was not able to find it because KafkaTransactionManager extends AbstractPlatformTransactionManager and JpaTransactionManager also extends the same class.
Defining this bean as the primary bean fixed the issue.
@Bean
@Primary
public JpaTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) {
return new JpaTransactionManager(entityManagerFactory);
}
Kafka transactions get chained with it and KafkaTemplate will synchronize a transaction with transaction manager.
Reference https://docs.spring.io/spring-kafka/reference/html/#transactions
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With