I'm trying to understand what is better option to handle records that take longer to process in kafka consumer? I ran few tests to understand this and observed that we can control this with by modifying either max.poll.records
or max.poll.interval.ms
.
Now my question is, what's the better option to choose? Please suggest.
If you have slow processing and so want to avoid rebalances then tuning either would achieve that. However extending max.poll.interval.ms to allow for longer gaps between poll does have a bit of a side effect.
Each consumer only uses 2 threads - polling thread and heartbeat thread.
The latter lets the group know that your application is still alive so can trigger a rebalance before max.poll.interval.ms expires.
The polling thread does everything else in terms of group communication so during the poll method you find out if a rebalance has been triggered elsewhere, you find out if a partition leader has died and hence metadata refresh is required. The implication is that if you allow longer gaps between polls then the group as a whole is slower to respond to change (for example no consumers start receiving messages after a rebalance until they have all received their new partitions - if a rebalance occurs just after one consumer has started processing a batch for 10 minutes then all consumers will be hanging around for at least that long).
Hence for a more responsive group in situations where processing of messages is expected to be slow you should choose to reduce the records fetched in each batch.
max.poll.records
simply defines the maximum number of records returned in a single call to poll()
.
Now max.poll.interval.ms
defines the delay between the calls to poll()
.
max.poll.interval.ms:
The maximum delay between invocations ofpoll()
when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. Ifpoll()
is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. For consumers using a non-nullgroup.instance.id
which reach this timeout, partitions will not be immediately reassigned. Instead, the consumer will stop sending heartbeats and partitions will be reassigned after expiration ofsession.timeout.ms
. This mirrors the behavior of a static consumer which has shutdown.
I believe you can tune both in order to get to the expected behaviour. For example, you could compute the average processing time for the messages. If the average processing time is say 1 second and you have max.poll.records=100
then you should allow approximately 100+ seconds for the poll interval.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With