The new Kafka version (0.11) supports exactly once semantics.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
I've got a producer setup with kafka transactional code in java like this.
producer.initTransactions();
try {
producer.beginTransaction();
for (ProducerRecord<String, String> record : payload) {
producer.send(record);
}
Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() {
{
put(new TopicPartition(TOPIC, 0), new OffsetAndMetadata(42L, null));
}
};
producer.sendOffsetsToTransaction(groupCommit, "groupId");
producer.commitTransaction();
} catch (ProducerFencedException e) {
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
}
I'm not quite sure how to use the sendOffsetsToTransaction and the the intended use case of it. AFAIK, consumer groups is a multithreaded read feature on consumer end.
javadoc says
" Sends a list of consumed offsets to the consumer group coordinator, and also marks those offsets as part of the current transaction. These offsets will be considered consumed only if the transaction is committed successfully. This method should be used when you need to batch consumed and produced messages together, typically in a consume-transform-produce pattern."
How would produce maintain a list of consumed offsets? Whats the point of it?
This is only relevant to workflows in which you are consuming and then producing messages based on what you consumed. This function allows you to commit offsets you consumed only if the downstream producing succeeds. If you consume data, process it somehow, and then produce the result, this enables transactional guarantees across the consumption/production.
Without transactions, you normally use Consumer#commitSync()
or Consumer#commitAsync()
to commit consumer offsets. But if you use these methods before you've produced with your producer, you will have committed offsets before knowing whether the producer succeeded sending.
So, instead of committing your offsets with the consumer, you can use Producer#sendOffsetsToTransaction()
on the producer to commit the offsets instead. This sends the offsets to the transaction manager handling the transaction. It will commit the offsets only if the entire transactions—consuming and producing—succeeds.
(Note: when you send the offsets to commit, you should add 1 to the offset last read, so that future reads resume from the offset you haven't read. This is true regardless of whether you commit with the consumer or the producer. See: KafkaProducer sendOffsetsToTransaction need offset+1 to successfully commit current offset).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With