Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Transaction Synchronization in Spring Kafka

I want to synchronize a kafka transaction with a repository transaction:

@Transactional
public void syncTransaction(){
  myRepository.save(someObject)
  kafkaTemplate.send(someEvent)
}

Since the merge (https://github.com/spring-projects/spring-kafka/issues/373) and according to the doc this is possible. Nevertheless i have problems to understand and implement that feature. Looking at the example in https://docs.spring.io/spring-kafka/reference/html/#transaction-synchronization I have to create a MessageListenerContainer to listen to my own events. Do I still have to send my events using the KafkaTemplate? Does the MessageListenerContainer prohibit the sending to the broker?

And if i understand correctly the kafkaTemplate und the kafkaTransactionManager have to use the same producerFactory in which i have to enable Transaction setting a transactionIdPrefix. And in my example i have to set the TransactionManager of the messageListenerContainer to the DataSourceTransactionManager. Is that correct?

From my perspective it looks weird that I send an event via kafkaTemplate, listen to my own event and forward the event using the kafkaTemplate again.

I would really help me if i can get an example for a simple synchronization of a kafka transaction with a repository transaction and an explanation.

like image 887
Eike Behrends Avatar asked Nov 17 '17 15:11

Eike Behrends


People also ask

Is it possible to synchronize distributed transactions between Kafka and spring?

Yes, but that is also true with transaction synchronization - it's called "Best Efforts 1PC Pattern" in Dr. Dave Syer's excellent Javaworld Artucle "Distributed transactions in Spring, with and without XA". Kafka doesn't support XA and you have to deal with the possibility that the DB tx might commit while the Kafka tx rolls back.

How does @transactional work in Kafka?

The interceptor for the @Transactional annotation starts the transaction and the KafkaTemplate will synchronize a transaction with that transaction manager; each send will participate in that transaction. When the method exits, the database transaction will commit followed by the Kafka transaction.

What is kafkatransactionmanager in spring?

The KafkaTransactionManager is an implementation of Spring Framework’s PlatformTransactionManager . It is provided with a reference to the producer factory in its constructor. If you provide a custom producer factory, it must support transactions.

How to configure Kafka transaction ID prefix in Spring Boot?

Also see transactionIdPrefix. With Spring Boot, it is only necessary to set the spring.kafka.producer.transaction-id-prefix property - Boot will automatically configure a KafkaTransactionManager bean and wire it into the listener container. Starting with version 2.5.8, you can now configure the maxAge property on the producer factory.


2 Answers

@Eike Behrends to have a db + kafka transaction, you can use ChainedTransactionManager and define it this way :

@Bean
public KafkaTransactionManager kafkaTransactionManager() {
    KafkaTransactionManager ktm = new KafkaTransactionManager(producerFactory());;
    ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
    return ktm;
}


@Bean
@Primary
public JpaTransactionManager transactionManager(EntityManagerFactory em) {
    return new JpaTransactionManager(em);
}

@Bean(name = "chainedTransactionManager")
public ChainedTransactionManager chainedTransactionManager(JpaTransactionManager jpaTransactionManager,
                                                           KafkaTransactionManager kafkaTransactionManager) {
    return new ChainedTransactionManager(kafkaTransactionManager, jpaTransactionManager);
}

You need to annotate your transactional db+kafka methods @Transactional("chainedTransactionManager")

(you can see the issue on spring-kafka project : https://github.com/spring-projects/spring-kafka/issues/433 )

You say :

From my perspective it looks weird that I send an event via kafkaTemplate, listen to my own event and forward the event using the kafkaTemplate again.

Have you tried this ? If so can you provide an example please ?

like image 67
nader.h Avatar answered Oct 06 '22 12:10

nader.h


If the listener container is provisioned with a KafkaTransactionManager, the container will create a producer which will be used by any downstream kafka template and the container will send the offsets to the transaction for you.

If the container has some other transaction manager, the container can't send the offsets since it doesn't have access to the producer (or template).

Another solution is to annotate your method with @Transactional (with the datasource TM) and configure the container with a kafka TM.

That way, your DB tx will commit just before the thread returns to the container which will then send the offsets to the kafka transaction and commit it.

See the framework test cases for examples.

like image 23
Gary Russell Avatar answered Oct 06 '22 12:10

Gary Russell