I have been working with Kafka lately and have bit of confusion regarding the consumers under a consumer group. The center of the confusion is whether to implement consumers as processes or threads. For this question, assume I am using the high level consumer.
Let's consider a scenario that I have experimented with. In my topic there are 2 partitions (for simplicity let's assume replication factor is just 1). I created a consumer (ConsumerConnector
) process consumer1
with group group1
, then created a topic count map of size 2 and then spawned 2 consumer threads consumer1_thread1
and consumer1_thread2
under that process. It looks like consumer1_thread1
is consuming partition 0
and consumer1_thread2
is consuming partition 1
. Is this behaviour always deterministic? Below is the code snippet. Class TestConsumer
is my consumer thread class.
...
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(2));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(2);
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new TestConsumer(stream, threadNumber));
threadNumber++;
}
...
Now, let's consider another scenario (which I haven't experimented but am curious) where I start 2 consumer processes consumer1
and consumer2
both having the same group group1
and each of them is a single threaded process. Now my questions are:
How will the two independent consumer processes (under the same group nevertheless) be related to the partitions in this case ? How is it different from the above single process multi-thread scenario?
In general, how are consumer threads or processes mapped / related to partitions in the topic?
The Kafka documentation does say that each consumer under a consumer group will consume one partition. However, does that refer to a consumer thread (like my above code example) or independent consumer processes?
Is there any subtle thing I am missing here regarding implementing consumers as processes vs threads? Thanks in advance.
A consumer can be assigned to consume multiple partitions. So the rule in Kafka is only one consumer in a consumer group can be assigned to consume messages from a partition in a topic and hence multiple Kafka consumers from a consumer group can not read the same message from a partition.
When implementing a multi-threaded consumer architecture, it is important to note that the Kafka consumer is not thread safe. Multi-threaded access must be properly synchronized, which can be tricky. This is why the single-threaded model is commonly used.
The consumers in a group divide the topic partitions as fairly amongst themselves as possible by establishing that each partition is only consumed by a single consumer from the group. When the number of consumers is lower than partitions, same consumers are going to read messages from more than one partition.
The way consumers maintain membership in a consumer group and ownership of the partitions assigned to them is by sending heartbeats to a Kafka broker designated as the group coordinator (this broker can be different for different consumer groups).
A consumer group can have multiple consumer instances running (multiple process with the same group-id
). While consuming each partition is consumed by exactly one consumer instance in the group.
E.g. if your topic contains 2 partitions and you start a consumer group group-A
with 2 consumer instances then each one of them will be consuming messages from a particular partition of the topic.
If you start the same 2 consumer with different group id group-A
& group-B
then the message from both partitions of the topic will be broadcast to each one of them. So in that case the consumer instance running under group-A
will have messages from both the partitions of the topic, and same is true for group-B
as well.
Read more on this on their documentation
EDIT : Based on your comment which says,
I was wondering what is the effective difference between having 2 consumer threads under the same process as opposed to 2 consumer processes (group being the same in both cases)
The consumer group-id
is same/global across the cluster. Suppose you have started process-one with 2 threads and then spawn another process (may be in a different machine) with the same groupId having 2 more threads then kafka will add these 2 new threads to consume messages from the topic. So eventually there will be 4 threads responsible for consuming from the same topic. Kafka will then trigger a re-balance to re-assign partitions to threads, so it could happen that for a particular partition which was being consumed by thread T1 of process P1
may be allocated to be consumed by thread T2 of process P2
. The below few lines are taken from the wiki page
When a new process is started with the same Consumer Group name, Kafka will add that processes' threads to the set of threads available to consume the Topic and trigger a 're-balance'. During this re-balance Kafka will assign available partitions to available threads, possibly moving a partition to another process. If you have a mixture of old and new business logic, it is possible that some messages go to the old logic.
The main design decision for opting for multiple consumer group instances with the same id vs a single consumer group instance is resiliency. For example if you have a single consumer with two threads then if this machine goes down you loose all consumers. If you have two separate consumer groups with the same id, each on different hosts then they can survive failure. Ideally each consumer group should have two threads in the above, therefore if one host goes down the other consumer group uses its dormant thread to take up the other partition. Indeed it is always desirable to have more threads than partitions to cover this factor.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With