I see from the logs that exact same message is consumed by the 665 times. Why does this happen?
I also see this in the logs
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.
This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies
that the poll loop is spending too much time message processing. You can address this either by increasing the session
timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
Consumer properties
group.id=someGroupId
bootstrap.servers=kafka:9092
enable.auto.commit=false
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
session.timeout.ms=30000
max.poll.records=20
PS: Is it possible to consume only a specific number of messages like 10 or 50 or 100 messages from the 1000 that are in the queue? I was looking at 'fetch.max.bytes' config, but it seems like it is for a message size rather than number of messages.
Thanks
The answer lies in the understanding of the following concepts:
In your case, your consumer receives a message via poll() but is not able to complete the processing in max.poll.interval.ms time. Therefore, it is assumed hung by the Broker and re-balancing of partitions happen due to which this consumer loses the ownership of all partitions. It is marked dead and is no longer part of a consumer group.
Then when your consumer completes the processing and calls poll() again two things happen:
Consumer again takes a lot of time to process and since is unable to finish processing in less than max.poll.interval.ms, 1. and 2. keep repeating in a loop.
To fix the problem, you can increase the max.poll.interval.ms to a large enough value based on how much time your consumer needs for processing. Then your consumer will not get marked as dead and will not receive duplicate messages. However, the real fix is to check your processing logic and try to reduce the processing time.
The fix is described in the message you pasted:
You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
The reason is a timeout is reached before your consumer is able to process and commit the message. When your Kafka consumer "commits", it's basically acknowledging receipt of the previous message, advancing the offset, and therefore moving onto the next message. But if that timeout is passed (as is the case for you), the consumer's commit isn't effective because it's happening too late; then the next time the consumer asks for a message, it's given the same message
Some of your options are to:
session.timeout.ms=30000
, so the consumer has more time
process the messagesmax.poll.records=20
so the consumer has less messages it'll need to work on before the timeout occurs. But this doesn't really apply to you because your consumer is already only just working on a single messageenable.auto.commit
, which probably also isn't the best solution for you because it might result in dropping messages though, as mentioned below:
If we allowed offsets to auto commit as in the previous example messages would be considered consumed after they were given out by the consumer, and it would be possible that our process could fail after we have read messages into our in-memory buffer but before they had been inserted into the database. Source: https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With