Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Should we use max.poll.records or max.poll.interval.ms to handle records that take longer to process in kafka consumer?

I'm trying to understand what is better option to handle records that take longer to process in kafka consumer? I ran few tests to understand this and observed that we can control this with by modifying either max.poll.records or max.poll.interval.ms.

Now my question is, what's the better option to choose? Please suggest.

like image 622
Raj Avatar asked Jan 25 '23 03:01

Raj


2 Answers

If you have slow processing and so want to avoid rebalances then tuning either would achieve that. However extending max.poll.interval.ms to allow for longer gaps between poll does have a bit of a side effect.

Each consumer only uses 2 threads - polling thread and heartbeat thread.

The latter lets the group know that your application is still alive so can trigger a rebalance before max.poll.interval.ms expires.

The polling thread does everything else in terms of group communication so during the poll method you find out if a rebalance has been triggered elsewhere, you find out if a partition leader has died and hence metadata refresh is required. The implication is that if you allow longer gaps between polls then the group as a whole is slower to respond to change (for example no consumers start receiving messages after a rebalance until they have all received their new partitions - if a rebalance occurs just after one consumer has started processing a batch for 10 minutes then all consumers will be hanging around for at least that long).

Hence for a more responsive group in situations where processing of messages is expected to be slow you should choose to reduce the records fetched in each batch.

like image 44
Chris Avatar answered Feb 05 '23 15:02

Chris


max.poll.records simply defines the maximum number of records returned in a single call to poll().

Now max.poll.interval.ms defines the delay between the calls to poll().

max.poll.interval.ms: The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. For consumers using a non-null group.instance.id which reach this timeout, partitions will not be immediately reassigned. Instead, the consumer will stop sending heartbeats and partitions will be reassigned after expiration of session.timeout.ms. This mirrors the behavior of a static consumer which has shutdown.


I believe you can tune both in order to get to the expected behaviour. For example, you could compute the average processing time for the messages. If the average processing time is say 1 second and you have max.poll.records=100 then you should allow approximately 100+ seconds for the poll interval.

like image 97
Giorgos Myrianthous Avatar answered Feb 05 '23 16:02

Giorgos Myrianthous