Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Consumer Rebalancing Algorithm

Tags:

Can someone please tell me what the rebalancing algorithm is for Kafka consumers? I would like to understand how partition count and consumer threads affect this.

Thank you,

like image 450
NSA Avatar asked Feb 18 '15 00:02

NSA


People also ask

How does rebalancing work in Kafka?

Overview. Kafka Rebalance happens when a new consumer is either added (joined) into the consumer group or removed (left). It becomes dramatic during application service deployment rollout, as multiple instances restarted at the same time, and rebalance latency significantly increasing.

What is rebalancing consumer in Kafka?

During a rebalance event, every consumer that's still in communication with the group coordinator must revoke then regain its partitions, for all partitions within its assignment. More partitions to manage means more time to wait as all the consumers within the group take the time to manage those relationships.

How much time Kafka rebalancing takes?

During the entire rebalancing process, i.e. as long as the partitions are not reassigned, consumers no longer process any data. By default, the rebalance timeout is fixed to 5 minutes which can be a very long period during which the increasing consumer-lag can become an issue.


2 Answers

Ok so there are 2 rebalancing algorithms at the moment - Range and RoundRobin. They are also called Partition Assignment Strategies.

For the simplicity assume we have a topic T1 with 10 partitions and we also have 2 consumers with different configurations (for the example to be clearer) - C1 with num.streams set to 1 and C2 with num.streams set to 2.

Here's how that would work with Range strategy:

Range lays out available partitions in numeric order and consumer threads in lexicographic order. So in our case the order of partitions will be 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 and order of consumer threads will be C1-0, C2-0, C2-1. Then the number of partitions is divided by the number of consumer threads to determine how many partitions each consumer thread should own. In our case it doesn't divide equally, so the thread C1-0 will get one extra partition. The final partition assignment would look like this:

C1-0 gets partitions 0, 1, 2, 3
C2-0 gets partitions 4, 5, 6
C2-1 gets partitions 7, 8, 9

If there would be 11 partitions the partition assignment for these consumers would change a bit:

C1-0 would get partitions 0, 1, 2, 3
C2-0 would get partitions 4, 5, 6, 7
C2-1 would get partitions 8, 9, 10

That's it.

The same configuration wouldn't work for RoundRobin strategy as it requires equal num.streams across all consumers subscribed for this topic, so lets assume both consumers have num.streams set to 2 now. One major difference compared to Range strategy here is that you cannot predict what the assignment will be prior to rebalance. Here's how that would work with RoundRobin strategy:

First, there are 2 conditions that MUST be satisfied before actual assignment:

a) Every topic has the same number of streams within a consumer instance (that's why I mentioned above that different number of threads per consumer will not work)
b) The set of subscribed topics is identical for every consumer instance within the group (we have one topic here so that's not a problem now).

When these 2 conditions are verified the topic-partition pairs are sorted by hashcode to reduce the possibility of all partitions of one topic to be assigned to one consumer (if there is more than one topic to be consumed).

And finally, all topic-partition pairs are assigned in a round-robin fashion to available consumer threads. For example if our topic-partitions will end up sorted like this: T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9 and consumer threads are C1-0, C1-1, C2-0, C2-1 then the assignment will be like this:

T1-5 goes to C1-0
T1-3 goes to C1-1
T1-0 goes to C2-0
T1-8 goes to C2-1
At this point no more consumer threads are left, but there are still more topic-partitions, so iteration over consumer threads starts over:
T1-2 goes to C1-0
T1-1 goes to C1-1
T1-4 goes to C2-0
T1-7 goes to C2-1
And again:
T1-6 goes to C1-0
T1-9 goes to C1-1

At this point all topic-partitions are assigned and each consumer thread has near-equal number of partitions each.

Hope this helps.

like image 119
serejja Avatar answered Oct 26 '22 14:10

serejja


You could read this Kafka docs http://kafka.apache.org/documentation/#impl_brokerregistration about Consumer registration algorithm and Consumer rebalancing algorithm

As it said, each consumer does the following during rebalancing :

1. For each topic T that C<sub>i</sub> subscribes to 2.   let P<sub>T</sub> be all partitions producing topic T 3.   let C<sub>G</sub> be all consumers in the same group as C<sub>i</sub> that consume topic T 4.   sort P<sub>T</sub> (so partitions on the same broker are clustered together) 5.   sort C<sub>G</sub> 6.   let i be the index position of C<sub>i</sub> in C<sub>G</sub> and let N = size(P<sub>T</sub>)/size(C<sub>G</sub>) 7.   assign partitions from i*N to (i+1)*N - 1 to consumer C<sub>i</sub> 8.   remove current entries owned by C<sub>i</sub> from the partition owner registry 9.   add newly assigned partitions to the partition owner registry     (we may need to re-try this until the original partition owner releases its ownership) 

And also Notice that:

If there are more consumers than partitions, some consumers won't get any data at all. During rebalancing, we try to assign partitions to consumers in such a way that reduces the number of broker nodes each consumer has to connect to.

like image 22
GuangshengZuo Avatar answered Oct 26 '22 12:10

GuangshengZuo