Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Kafka: consumer state

I read the documentation on the Kafka website but after trying to implement a complete minimal example ( producer --> kafka --> consumer) it's not very clear to me how the "consumer state", the offset needs to be handled.

Some info

  1. I'm using the HighLevel API (Java)
  2. My consumer is a simple class with a Main, basically the same that can be found on the "quickstart" Kafka page
  3. I'm using Zookeeper
  4. I'm using a single broker

Now, the documentation says that the HighLevel API consumer stores its state using zookeeper so I would expect the offset and therefore the state of the consumer would be maintained between

  • Kafka broker restarts
  • Consumer restarts

But unfortunately it doesn't: each time I restart the broker or the consumer, all messages are re-delivered. Now, probably these are stupid questions but

  1. In case of Kafka restart: I understood that is up to the consumer to keep its state so probably when the broker (re)starts up redeliver all (!) messages and the consumer decides what to consume...is that right? If so, what happens if I have 10.0000.0000 of messages?

  2. In case of JVM consumer restart: if the state is kept on Zookeeper why are the messages re-delivered? Is it possibile that the new JVM has a different consumer "identity"? And in this case, how can I bind the previous identity?

like image 376
Andrea Avatar asked Feb 07 '13 15:02

Andrea


People also ask

How can I check Kafka consumer status?

On the application side, if the consumer is pulling data, then you can tell that the consumer is running. On the Kafka broker side, you can tell consumers are running by looking at the consumer group lag monitoring data. If consumer group lag is near zero, then the consumers are running.

What is a consumer in Apache Kafka?

Kafka consumers act as end-users or applications that retrieve data from Kafka servers inside which Kafka producers publish real-time messages. For effectively fetching real-time messages, Kafka consumers have to subscribe to the respective topics present inside the Kafka servers.

What happens if Kafka consumer is down?

If the consumer crashes or is shut down, its partitions will be re-assigned to another member, which will begin consumption from the last committed offset of each partition. If the consumer crashes before any offset has been committed, then the consumer which takes over its partitions will use the reset policy.


2 Answers

Yes, consumer is responsible for keeping its state, and Java high-level Consumer saves its state in zookeeper.

Most likely you didn't specify groupId configuration property. In that situation kafka generates random groupId.

It's also possible that you turned off autocommit.enable configuration property.

Full reference of Kafka configuration might be found on this page: http://kafka.apache.org/configuration.html under "Important configuration properties for the high-level consumer" title.

like image 119
Wildfire Avatar answered Oct 20 '22 12:10

Wildfire


to answer the original question: using groupId helps avoid the "re-consuming all messages from the beginning of time" situation

if you change the groupId you'll get all messages from the moment the queue was created (or since the last data purge based on kafka logs retention policy)

don't confuse this with kafka-console-consumer "--from-beginning" flag (which sets auto.offset.reset option) which is there to choose between options 1 and 2 below:

1) consume new messages from the moment the last message was consumed (NOT from the beginning of time when kafka queue was originally created):

props.put("auto.offset.reset","smallest");

2) consume new messages from the moment subscriber JVM is started (in this case you risk missing messages put on the queue while subscriber was down and not listening to the queue):

props.put("auto.offset.reset","largest");


side note: below is only tangentially related to the original question

for a more advanced use case - if you're trying to programatically set consumer offset to replay messages starting from certain time - it would require using SimpleConsumer API as shown in https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example in order to find the smallest offset to replay from the right broker/partition. Which is essentially replacing zookeeper with our own FindLeader logic. very tricky.

for this use case (ad-hoc replay of messages starting from certain user-specified time) we decided to store local cache of the messages and manage offsets locally instead of using kafka offset management api (which would require reimplementing a good chunk of zookeeper functionality with SimpleConsumer).

I.e. treat kafka as a "postman", once the message is delivered it goes to local mailbox and in case we need to go back to a certain offset in the past and, say, replay the messages (that have been already consumed) e.g. in case of consumer app error, we don't go back to the "post office" (kafka brokers) to figure out the correct delivery ordering, but manage it locally.

end of side note

like image 27
alex Avatar answered Oct 20 '22 13:10

alex