Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Consumer Id and Group Id in Kafka: what makes two consumers the same

I've been using Kafka for some months now, and I realized that some of the core concepts are not so clear for me yet. My doubt is related to the relation between consumerId, groupId and offsets. In our application we need Kafka to work using the publish - subscribe paradigm, so we use diffent group ids for each consumer, which are randomly generated.

I used to think that setting auto.offset.reset = latest my consumers will always receive the messages they have not received yet, but lately I learned that is not the case. That only works if the consumer has not offsets committed yet. In any other case, the consumer will continue receiving messages with offset greater than the last offset it committed.

Since I always create new consumers with random group ids, I realized that my consumers have "no memory", they are new consumers and they will never have offsets committed, so the auto.offset.reset = latest policy will always apply. And here is where my doubts start. Suppose the following scenario:

  1. I have two client applications, A and B, with one consumer each, working in the publish - subscribe way (thus, with different groups ids). Both consumers are subscribe to the topic my-topic. auto.offset.resetsetting is latest for both consumers.
  2. Some producer (or producers) publish messages M1, M2 and M3 to topic my-topic.
  3. Both A and B receive M1, M2 and M3.
  4. Now I shutdown application B.
  5. Producers produce messages M4 and M5.
  6. Application A receives messages M4 and M5.
  7. Now I restart application B. Remember, groupId is random, and I am not setting any consumer id, so that means this is a new consumer (right?). Application B does not receive any message.
  8. Producers publish messages M6 and M7.
  9. Both applications A and B receive messages M6 and M7.

So, summarizing, if I am not wrong, A receives all messages but B has missed M4 and M5. I've tried this with kafka-console-consumer.sh and it behaves this way.

So, how can I make application B receive the messages published while it was shut down? I now if I start it assigning the same groupId as when it was originally started, it will read messages M4 and M5, but that is setting the group id. Is it possible to set the consumer id too, and get the same behaviour?

Or put another way, what is understood by starting the same consumer again? Two consumers are the same consumer if they have the same groupId, the same consumerId, both?

By the way, consumerId and the property client.id are the same?

like image 502
joanlofe Avatar asked May 16 '19 16:05

joanlofe


2 Answers

Two consumers are in the same group if they have the same group.id setting.

I'm not entirely sure what you mean with consumerId. As of Kafka 2.2, no such field exist in the consumer configurations.

If you're talking about client.id, this setting has no functional effect, it's only used to tag requests so they can be matched in the broker's log if needed.

When you run a consumer with auto.offset.reset=latest, if no committed offsets exist, the consumer will restart consuming from the end of the log. So it will only receive messages that are produced after it starts. So in your scenario, you are correct, it will never receive M4 and M5.

If you want to consume all messages, you need to keep the same group.id. In that case, auto.offset.reset will only apply the first time the consumer starts. That way, when your consumer restarts, it will pick up where it was when it stopped.

like image 99
Mickael Maison Avatar answered Nov 15 '22 02:11

Mickael Maison


It is because you are setting the auto.offset.reset = latest

Any messages that was sent during the consumer that is not up and running will not be processed by the consumer.

So B will missout the two messages

like image 36
Arunasalam Govindasamy Avatar answered Nov 15 '22 02:11

Arunasalam Govindasamy