I am new to kafka and trying to build a producer - consumer app using kafka. Here I am able to produce messages to kalka but when I try to consume it back using a consumer, it is returning 0 records.
I check offset for my consumer group, I can see that offset is equals to log length are same (1M in my case - same as number of records).
If I use this config property while creating my consumer, its reading from beginning.
configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
But my requirement is, if I restart consumer, it should start from the previous end point like AMQ.
Is there anything I am missing here. I think offset should change only after a consumer polls. Why is it set to max records length in the beginning itself ?
As the link describes there are couple of scenarios you need to consider:
starting a new consumer (new group.id
): for this case, there will be no committed offset, and thus the consumer start to read according to parameter setting auto.offset.reset
restarting a consumer (reuse of group.id
): for this case, the consumer will resume where it left off. Parameter setting auto.offset.reset
is ignored.
Thus, for scenario (1) you can just "configure" you starting position. For scenario (2), you starting position is "fixed" (ie, always last committed offset) and this cannot be change via a config. However, you can alway do a .seekToBeginning()
or .seekToEnd()
before you first call to poll()
and either read the whole topic or start at the end of the topic. A call to .seekXX()
will "overwrite" the last committed offset and allow you to start consuming at any offset you like. Note, there is also seek()
that take on "offset parameter" so you can specify any offset you want to start consuming from.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With