We have an application that a consumer reads a message and the thread does a number of things, including database accesses before a message is produced to another topic. The time between consuming and producing the message on the thread can take several minutes. Once message is produced to new topic, a commit is done to indicate we are done with work on the consumer queue message. Auto commit is disabled for this reason.
I'm using the high level consumer and what I'm noticing is that zookeeper and kafka sessions timeout because it is taking too long before we do anything on consumer queue so kafka ends up rebalancing every time the thread goes back to read more from consumer queue and it starts to take a long time before a consumer reads a new message after a while.
I can set zookeeper session timeout very high to not make that a problem but then i have to adjust the rebalance parameters accordingly and kafka won't pickup a new consumer for a while among other side effects.
What are my options to solve this problem? Is there a way to heartbeat to kafka and zookeeper to keep both happy? Do i still have these same issues if i were to use a simple consumer?
The consumer session timeout is a crucial configuration for group stability. When there is a member failure, the group must pause in order to rebalance partitions. If the failure is spurious, then we often get two rebalances: one for the member failure and one for the failed member when it rejoins.
Resolution. The default timeout is 1 minute, to change it, open the Kafka Client Configuration > Producer tab > Advance Properties > add max.block.ms and set to desired value (in milliseconds).
If the consumer crashes or is shut down, its partitions will be re-assigned to another member, which will begin consumption from the last committed offset of each partition. If the consumer crashes before any offset has been committed, then the consumer which takes over its partitions will use the reset policy.
heartbeat.interval.msThe expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group.
It sounds like your problems boil down to relying on the high-level consumer to manage the last-read offset. Using a simple consumer would solve that problem since you control the persistence of that offset. Note that all the high-level consumer commit does is store the last read offset in zookeeper. There's no other action taken and the message you just read is still there in the partition and is readable by other consumers.
With the kafka simple consumer, you have much more control over when and how that offset storage takes place. You can even persist that offset somewhere other than Zookeeper (a data base, for example).
The bad news is that while the simple consumer itself is simpler than the high-level consumer, there's a lot more work you have to do code-wise to make it work. You'll also have to write code to access multiple partitions - something the high-level consumer does quite nicely for you.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With