I've been using Kafka for some months now, and I realized that some of the core concepts are not so clear for me yet. My doubt is related to the relation between consumerId, groupId and offsets. In our application we need Kafka to work using the publish - subscribe paradigm, so we use diffent group ids for each consumer, which are randomly generated.
I used to think that setting auto.offset.reset = latest
my consumers will always receive the messages they have not received yet, but lately I learned that is not the case. That only works if the consumer has not offsets committed yet. In any other case, the consumer will continue receiving messages with offset greater than the last offset it committed.
Since I always create new consumers with random group ids, I realized that my consumers have "no memory", they are new consumers and they will never have offsets committed, so the auto.offset.reset = latest
policy will always apply. And here is where my doubts start. Suppose the following scenario:
my-topic
. auto.offset.reset
setting is latest
for both consumers. my-topic
. groupId
is random, and I am not setting any consumer id, so that means this is a new consumer (right?). Application B does not receive any message.So, summarizing, if I am not wrong, A receives all messages but B has missed M4 and M5. I've tried this with kafka-console-consumer.sh
and it behaves this way.
So, how can I make application B receive the messages published while it was shut down? I now if I start it assigning the same groupId as when it was originally started, it will read messages M4 and M5, but that is setting the group id. Is it possible to set the consumer id too, and get the same behaviour?
Or put another way, what is understood by starting the same consumer again? Two consumers are the same consumer if they have the same groupId, the same consumerId, both?
By the way, consumerId and the property client.id are the same?
Two consumers are in the same group if they have the same group.id
setting.
I'm not entirely sure what you mean with consumerId
. As of Kafka 2.2, no such field exist in the consumer configurations.
If you're talking about client.id
, this setting has no functional effect, it's only used to tag requests so they can be matched in the broker's log if needed.
When you run a consumer with auto.offset.reset=latest
, if no committed offsets exist, the consumer will restart consuming from the end of the log. So it will only receive messages that are produced after it starts. So in your scenario, you are correct, it will never receive M4 and M5.
If you want to consume all messages, you need to keep the same group.id
. In that case, auto.offset.reset
will only apply the first time the consumer starts. That way, when your consumer restarts, it will pick up where it was when it stopped.
It is because you are setting the auto.offset.reset = latest
Any messages that was sent during the consumer that is not up and running will not be processed by the consumer.
So B will missout the two messages
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With