I am trying to wrap my head around Kafka transactions and exactly-once.
I have created a transactional consumer and I want to ensure that I read and process all messages for a topic. Kafka still commits the offset if the transaction fails and the message is therefore lost.
More formally, if a stream processing application consumes message A and produces message B such that B = F(A), then exactly once processing means that A is considered consumed if and only if B is successfully produced, and vice versa. Source
Based on this I would assume that message A is not consumed and will therefore be reprocessed again. But how will this message be reprocessed?
I have found plenty of sources stating that if processing fails, then the message is not consumed. But I cannot find any sources mentioning how to reprocess the message if it not consumed. I appears that if a transactional consumer fails then Kafka will rollback but continue to commit the offset such that it can process the next message. But if Kafka commits the offset, then the previous message is lost?
I have found some Spring Kafka pages describing how to handle exceptions in the consumer. So you basically either stop the container or the entire application. I thought Kafka had some internal mechanics to handle this behaviour since the documentation states that message A is only consumed if message B is successfully produced. But if message A is not consumed then Kafka will still commit the offset and continue with the next message.
It feels like exactly-once only applies to cases where errors never happen. I honestly do not care if a message is consumed or not consumed if Kafka will commit the offset regardless. The message will be lost even if the message is not consumed, so it seems like I have to stop the container or application to make sure I do not lose any messages.
https://spring.io/blog/2017/12/01/spring-for-apache-kafka-2-1-0-release-and-1-3-2-2-0-2-available https://docs.spring.io/spring-kafka/reference/html/_reference.html#annotation-error-handling https://github.com/spring-cloud/spring-cloud-stream/issues/1114
Edit 1: I have added some example code below.
I have a very simple Spring boot application.
@SpringBootApplication
@EnableTransactionManagement
public class KafkaTransactionMysteryApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaTransactionMysteryApplication.class, args);
}
@Bean
@Primary
public JpaTransactionManager transactionManager() {
return new JpaTransactionManager();
}
@Bean
public ChainedKafkaTransactionManager chainedTxM(JpaTransactionManager jpa, KafkaTransactionManager<?, ?> kafka) {
return new ChainedKafkaTransactionManager(kafka, jpa);
}
@KafkaListener(topics = "trans-topic")
@Transactional(propagation = Propagation.REQUIRED, transactionManager = "chainedTxM", rollbackFor = Exception.class)
public void listen(ConsumerRecord record) throws Exception {
System.out.println(record.value());
if (true) {
throw new Exception("Force rollback");
}
}
}
This is my application properties:
spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.bootstrap-servers=127.0.0.1:9092
spring.kafka.producer.transaction-id-prefix=mytrans
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id=trans-topic-grp1
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.ack-mode=RECORD
spring.kafka.consumer.enable-auto-commit=false
This is for the log for the above code when sending a kafka message from a console-producer:
2018-06-20 10:17:43.494 DEBUG 47970 --- [ntainer#0-0-C-1] o.s.k.t.KafkaTransactionManager : Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@55ef3ce1]]
2018-06-20 10:17:43.494 DEBUG 47970 --- [ntainer#0-0-C-1] o.s.orm.jpa.JpaTransactionManager : Creating new transaction with name [dk.kkflf.kafka.transaction.mystery.KafkaTransactionMysteryApplication$$EnhancerBySpringCGLIB$$67f55772.listen]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT; 'chainedTxM',-java.lang.Exception
2018-06-20 10:17:43.537 DEBUG 47970 --- [ntainer#0-0-C-1] o.h.stat.internal.StatisticsInitiator : Statistics initialized [enabled=false]
2018-06-20 10:17:43.541 DEBUG 47970 --- [ntainer#0-0-C-1] o.s.orm.jpa.JpaTransactionManager : Opened new EntityManager [SessionImpl(PersistenceContext[entityKeys=[],collectionKeys=[]];ActionQueue[insertions=ExecutableList{size=0} updates=ExecutableList{size=0} deletions=ExecutableList{size=0} orphanRemovals=ExecutableList{size=0} collectionCreations=ExecutableList{size=0} collectionRemovals=ExecutableList{size=0} collectionUpdates=ExecutableList{size=0} collectionQueuedOps=ExecutableList{size=0} unresolvedInsertDependencies=null])] for JPA transaction
2018-06-20 10:17:43.543 DEBUG 47970 --- [ntainer#0-0-C-1] o.h.e.t.internal.TransactionImpl : begin
2018-06-20 10:17:43.545 DEBUG 47970 --- [ntainer#0-0-C-1] o.s.orm.jpa.JpaTransactionManager : Exposing JPA transaction as JDBC transaction [org.springframework.orm.jpa.vendor.HibernateJpaDialect$HibernateConnectionHandle@4d13cb21]
hello stackoverflow
2018-06-20 10:17:43.545 DEBUG 47970 --- [ntainer#0-0-C-1] o.s.orm.jpa.JpaTransactionManager : Initiating transaction rollback
2018-06-20 10:17:43.545 DEBUG 47970 --- [ntainer#0-0-C-1] o.s.orm.jpa.JpaTransactionManager : Rolling back JPA transaction on EntityManager [SessionImpl(PersistenceContext[entityKeys=[],collectionKeys=[]];ActionQueue[insertions=ExecutableList{size=0} updates=ExecutableList{size=0} deletions=ExecutableList{size=0} orphanRemovals=ExecutableList{size=0} collectionCreations=ExecutableList{size=0} collectionRemovals=ExecutableList{size=0} collectionUpdates=ExecutableList{size=0} collectionQueuedOps=ExecutableList{size=0} unresolvedInsertDependencies=null])]
2018-06-20 10:17:43.545 DEBUG 47970 --- [ntainer#0-0-C-1] o.h.e.t.internal.TransactionImpl : rolling back
2018-06-20 10:17:43.547 DEBUG 47970 --- [ntainer#0-0-C-1] o.s.orm.jpa.JpaTransactionManager : Closing JPA EntityManager [SessionImpl(PersistenceContext[entityKeys=[],collectionKeys=[]];ActionQueue[insertions=ExecutableList{size=0} updates=ExecutableList{size=0} deletions=ExecutableList{size=0} orphanRemovals=ExecutableList{size=0} collectionCreations=ExecutableList{size=0} collectionRemovals=ExecutableList{size=0} collectionUpdates=ExecutableList{size=0} collectionQueuedOps=ExecutableList{size=0} unresolvedInsertDependencies=null])] after transaction
2018-06-20 10:17:43.547 DEBUG 47970 --- [ntainer#0-0-C-1] o.s.orm.jpa.EntityManagerFactoryUtils : Closing JPA EntityManager
2018-06-20 10:17:43.548 DEBUG 47970 --- [ntainer#0-0-C-1] o.s.k.t.KafkaTransactionManager : Initiating transaction rollback
2018-06-20 10:17:43.549 DEBUG 47970 --- [ntainer#0-0-C-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, transactionalId=mytrans0] Transition from state IN_TRANSACTION to ABORTING_TRANSACTION
2018-06-20 10:17:43.549 DEBUG 47970 --- [ntainer#0-0-C-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, transactionalId=mytrans0] Enqueuing transactional request (type=EndTxnRequest, transactionalId=mytrans0, producerId=0, producerEpoch=21, result=ABORT)
2018-06-20 10:17:43.549 DEBUG 47970 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, transactionalId=mytrans0] Not sending EndTxn for completed transaction since no partitions or offsets were successfully added
2018-06-20 10:17:43.549 DEBUG 47970 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, transactionalId=mytrans0] Transition from state ABORTING_TRANSACTION to READY
2018-06-20 10:17:43.550 DEBUG 47970 --- [ntainer#0-0-C-1] o.s.k.t.KafkaTransactionManager : Resuming suspended transaction after completion of inner transaction
2018-06-20 10:17:43.552 ERROR 47970 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = trans-topic, partition = 0, offset = 39, CreateTime = 1529479063351, serialized key size = -1, serialized value size = 19, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello stackoverflow)
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void dk.kkflf.kafka.transaction.mystery.KafkaTransactionMysteryApplication.listen(org.apache.kafka.clients.consumer.ConsumerRecord) throws java.lang.Exception' threw exception; nested exception is java.lang.Exception: Force rollback
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:267) ~[spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:80) ~[spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51) ~[spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1071) [spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1051) [spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:998) [spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:866) [spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:724) [spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_172]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_172]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_172]
Caused by: java.lang.Exception: Force rollback
at dk.kkflf.kafka.transaction.mystery.KafkaTransactionMysteryApplication.listen(KafkaTransactionMysteryApplication.java:39) ~[classes/:na]
at dk.kkflf.kafka.transaction.mystery.KafkaTransactionMysteryApplication$$FastClassBySpringCGLIB$$7b3cab85.invoke(<generated>) ~[classes/:na]
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) ~[spring-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:746) ~[spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:294) ~[spring-tx-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98) ~[spring-tx-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) ~[spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688) ~[spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at dk.kkflf.kafka.transaction.mystery.KafkaTransactionMysteryApplication$$EnhancerBySpringCGLIB$$1375ca24.listen(<generated>) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_172]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_172]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_172]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_172]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:181) ~[spring-messaging-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:114) ~[spring-messaging-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:248) ~[spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
... 10 common frames omitted
2018-06-20 10:17:43.553 DEBUG 47970 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Commit list: {trans-topic-0=OffsetAndMetadata{offset=40, metadata=''}}
2018-06-20 10:17:43.553 DEBUG 47970 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {trans-topic-0=OffsetAndMetadata{offset=40, metadata=''}}
2018-06-20 10:17:43.554 DEBUG 47970 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=trans-topic-grp1] Committed offset 40 for partition trans-topic-0
2018-06-20 10:17:43.693 DEBUG 47970 --- [rans-topic-grp1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=trans-topic-grp1] Sending Heartbeat request to coordinator localhost:9092 (id: 2147483647 rack: null)
2018-06-20 10:17:43.696 DEBUG 47970 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=trans-topic-grp1] Received successful Heartbeat response
2018-06-20 10:17:43.747 DEBUG 47970 --- [ntainer#0-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-1, groupId=trans-topic-grp1] Fetch READ_COMMITTED at offset 40 for partition trans-topic-0 returned fetch data (error=NONE, highWaterMark=40, lastStableOffset = 40, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
2018-06-20 10:17:43.747 DEBUG 47970 --- [ntainer#0-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-1, groupId=trans-topic-grp1] Added READ_COMMITTED fetch request for partition trans-topic-0 at offset 40 to node localhost:9092 (id: 0 rack: null)
2018-06-20 10:17:43.748 DEBUG 47970 --- [ntainer#0-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-1, groupId=trans-topic-grp1] Sendin
The source code is also accessible on github https://github.com/kkflf/kafka-transaction-mystery/tree/SO-50922050
Edit 2: My consumer will now rollback on exceptions and not commit the offset to Kafka. (Hurray!). I am now trying to link to together with a JPA transaction to a MySQL Database.
The code below will persist the MyMessage object to my MySQL database but it will rollback the Kafka transaction and replay the same Kafka message again(Which is a good thing). This will lead to duplicate entries in the MySQL database because the Kafka did a rollback but the JPA did not rollback. How do I overcome this situation - It seems like KafkaListener and my Repository does not share the same transaction session?
I have uploaded the latest changes to https://github.com/kkflf/kafka-transaction-mystery/tree/SO-50922050-edit2
This post is becoming very long, so I have pushed the log file here: https://github.com/kkflf/kafka-transaction-mystery/blob/SO-50922050-edit2/log-SO-50922050-edit2.log
Main application:
@SpringBootApplication
@EnableTransactionManagement
public class KafkaTransactionMysteryApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaTransactionMysteryApplication.class, args);
}
@Autowired
private MessageRepository messageRepository;
@Bean
public JpaTransactionManager transactionManager() {
return new JpaTransactionManager();
}
@Bean
@Primary
public ChainedKafkaTransactionManager chainedTxM(JpaTransactionManager jpa, KafkaTransactionManager<?, ?> kafka) {
return new ChainedKafkaTransactionManager(kafka, jpa);
}
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory(JpaTransactionManager jpa, KafkaTransactionManager<?, ?> kafka) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
factory.getContainerProperties().setTransactionManager(chainedTxM(jpa, kafka));
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProps());
}
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "trans-topic-grp1");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@KafkaListener(topics = "trans-topic", containerFactory = "kafkaListenerContainerFactory")
@Transactional(propagation = Propagation.REQUIRED, transactionManager = "chainedTxM", rollbackFor = Exception.class)
public void listen(List<String> records) throws Exception {
for (String record : records) {
MyMessage message = new MyMessage(record);
messageRepository.save(message);
if (record.equals("fail")) {
throw new Exception("Forced rollback - msg: " + record);
}
}
}
}
Repository to persist MyMessage object to the database:
@Repository
public class MessageRepository {
@Autowired
EntityManager em;
//Mandatory to make sure this is never executed outside a transaction
@Transactional(propagation = Propagation.MANDATORY)
public void save(MyMessage message) {
em.persist(message);
}
public MyMessage findById(Long id) {
return em.find(MyMessage.class, id);
}
}
auto.commit.enabled=true
with transactions; when a KafkaTransactionManager
is injected into the container, the container will send the offsets to the transaction if the listener exits normally.AfterRollbackProcessor
. The DefaultAfterRollbackProcessor
is invoked after the transaction is rolled back; it re-seeks all unprocessed records so they will be returned again by Kafka on the next consumer.poll()
.References you find for stopping/starting the container was for older versions of spring-kafka. For example, when NOT using transactions, we now provide the SeekToCurrentErrorHandler
(which performs a similar function to the AfterRollbackProcessor
when using transactions).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With