I read the documentation on the Kafka website but after trying to implement a complete minimal example ( producer --> kafka --> consumer) it's not very clear to me how the "consumer state", the offset needs to be handled.
Some info
Now, the documentation says that the HighLevel API consumer stores its state using zookeeper so I would expect the offset and therefore the state of the consumer would be maintained between
But unfortunately it doesn't: each time I restart the broker or the consumer, all messages are re-delivered. Now, probably these are stupid questions but
In case of Kafka restart: I understood that is up to the consumer to keep its state so probably when the broker (re)starts up redeliver all (!) messages and the consumer decides what to consume...is that right? If so, what happens if I have 10.0000.0000 of messages?
In case of JVM consumer restart: if the state is kept on Zookeeper why are the messages re-delivered? Is it possibile that the new JVM has a different consumer "identity"? And in this case, how can I bind the previous identity?
On the application side, if the consumer is pulling data, then you can tell that the consumer is running. On the Kafka broker side, you can tell consumers are running by looking at the consumer group lag monitoring data. If consumer group lag is near zero, then the consumers are running.
Kafka consumers act as end-users or applications that retrieve data from Kafka servers inside which Kafka producers publish real-time messages. For effectively fetching real-time messages, Kafka consumers have to subscribe to the respective topics present inside the Kafka servers.
If the consumer crashes or is shut down, its partitions will be re-assigned to another member, which will begin consumption from the last committed offset of each partition. If the consumer crashes before any offset has been committed, then the consumer which takes over its partitions will use the reset policy.
Yes, consumer is responsible for keeping its state, and Java high-level Consumer saves its state in zookeeper.
Most likely you didn't specify groupId
configuration property. In that situation kafka generates random groupId
.
It's also possible that you turned off autocommit.enable
configuration property.
Full reference of Kafka configuration might be found on this page: http://kafka.apache.org/configuration.html under "Important configuration properties for the high-level consumer" title.
to answer the original question: using groupId helps avoid the "re-consuming all messages from the beginning of time" situation
if you change the groupId you'll get all messages from the moment the queue was created (or since the last data purge based on kafka logs retention policy)
don't confuse this with kafka-console-consumer "--from-beginning" flag (which sets auto.offset.reset option) which is there to choose between options 1 and 2 below:
1) consume new messages from the moment the last message was consumed (NOT from the beginning of time when kafka queue was originally created):
props.put("auto.offset.reset","smallest");
2) consume new messages from the moment subscriber JVM is started (in this case you risk missing messages put on the queue while subscriber was down and not listening to the queue):
props.put("auto.offset.reset","largest");
side note: below is only tangentially related to the original question
for a more advanced use case - if you're trying to programatically set consumer offset to replay messages starting from certain time - it would require using SimpleConsumer API as shown in https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example in order to find the smallest offset to replay from the right broker/partition. Which is essentially replacing zookeeper with our own FindLeader logic. very tricky.
for this use case (ad-hoc replay of messages starting from certain user-specified time) we decided to store local cache of the messages and manage offsets locally instead of using kafka offset management api (which would require reimplementing a good chunk of zookeeper functionality with SimpleConsumer).
I.e. treat kafka as a "postman", once the message is delivered it goes to local mailbox and in case we need to go back to a certain offset in the past and, say, replay the messages (that have been already consumed) e.g. in case of consumer app error, we don't go back to the "post office" (kafka brokers) to figure out the correct delivery ordering, but manage it locally.
end of side note
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With