I have implemented a simple Kafka Dead letter record processor.
It works perfectly when using records produced from the Console producer.
However I find that our Kafka Streams applications do not guarantee that producing records to the sink topics that the offsets will be incremented by 1 for each record produced.
Dead Letter Processor Background:
I have a scenario where records may be received before all data required to process it is published. When records are not matched for processing by the streams app they are move to a Dead letter topic instead of continue to flow down stream. When new data is published we dump the latest messages from the Dead letter topic back in to the stream application's source topic for reprocessing with the new data.
The Dead Letter processor:
KafkaProducer#sendOffsetsToTransaction
to commit the last produced offsets.To track when all records in my range are processed for a topic's partition my service compares its last produced offset from the producer to the the consumers saved map of ending offsets. When we reach the ending offset the consumer pauses that partition via KafkaConsumer#pause
and when all partitions are paused (meaning they reached the saved Ending offset)then calls it exits.
The Kafka Consumer API States:
Offsets and Consumer Position Kafka maintains a numerical offset for each record in a partition. This offset acts as a unique identifier of a record within that partition, and also denotes the position of the consumer in the partition. For example, a consumer which is at position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5.
The Kafka Producer API references the next offset is always +1 as well.
Sends a list of specified offsets to the consumer group coordinator, and also marks those offsets as part of the current transaction. These offsets will be considered committed only if the transaction is committed successfully. The committed offset should be the next message your application will consume, i.e. lastProcessedMessageOffset + 1.
But you can clearly see in my debugger that the records consumed for a single partition are anything but incremented 1 at a time...
I thought maybe this was a Kafka configuration issue such as max.message.bytes
but none really made sense.
Then I thought perhaps it is from joining but didn't see any way that would change the way the producer would function.
Not sure if it is relevant or not but all of our Kafka applications are using Avro and Schema Registry...
Should the offsets always increment by 1 regardless of method of producing or is it possible that using Kafka streams API does not offer the same guarantees as the normal Producer Consumer clients?
Is there just something entirely that I am missing?
As Kafka adds each record to a partition, it assigns a unique sequential ID called an offset.
Kafka store the offset commits in a topic, when consumer commit the offset, kafka publish an commit offset message to an "commit-log" topic and keep an in-memory structure that mapped group/topic/partition to the latest offset for fast retrieval.
Kafka consumers can commit an offset to a partition. If the offset is committed successfully, after the consumer restarts, it can continue consuming from the committed offset. Kafka's offset is continuous as it follows the following constraints: The first message's offset is 0.
It commits the offset, indicating that all the previous records from that partition have been processed. So, if a consumer stops and comes back later, it restarts from the last committed position (if assigned to that partition again).
It is not an official API contract that message offsets are increased by one, even if the JavaDocs indicate this (it seems that the JavaDocs should be updated).
If you don't use transactions, you get either at-least-once semantics or no guarantees (some call this at-most-once semantics). For at-least-once, records might be written twice and thus, offsets for two consecutive messages are not really increased by one as the duplicate write "consumes" two offsets.
If you use transactions, each commit (or abort) of a transaction writes a commit (or abort) marker into the topic -- those transactional markers also "consume" one offset (this is what you observe).
Thus, in general you should not rely on consecutive offsets. The only guarantee you get is, that each offset is unique within a partition.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With