Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is the kafka consumer consuming the same message hundreds of times?

Tags:

apache-kafka

I see from the logs that exact same message is consumed by the 665 times. Why does this happen?

I also see this in the logs

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. 
This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies 
that the poll loop is spending too much time message processing. You can address this either by increasing the session 
timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

Consumer properties

group.id=someGroupId
bootstrap.servers=kafka:9092
enable.auto.commit=false
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
session.timeout.ms=30000
max.poll.records=20

PS: Is it possible to consume only a specific number of messages like 10 or 50 or 100 messages from the 1000 that are in the queue? I was looking at 'fetch.max.bytes' config, but it seems like it is for a message size rather than number of messages.

Thanks

like image 788
user1860447 Avatar asked Aug 07 '18 23:08

user1860447


2 Answers

The answer lies in the understanding of the following concepts:

  • session.timeout.ms
  • heartbeats
  • max.poll.interval.ms

In your case, your consumer receives a message via poll() but is not able to complete the processing in max.poll.interval.ms time. Therefore, it is assumed hung by the Broker and re-balancing of partitions happen due to which this consumer loses the ownership of all partitions. It is marked dead and is no longer part of a consumer group.

Then when your consumer completes the processing and calls poll() again two things happen:

  1. Commit fails as the consumer no longer owns the partitions.
  2. Broker identifies that the consumer is up again and therefore a re-balance is triggered and the consumer again joins the Consumer Group, start owning partitions and request messages from the Broker. Since the earlier message was not marked as committed (refer #1 above, failed commit) and is pending processing, the broker delivers the same message to consumer again.

Consumer again takes a lot of time to process and since is unable to finish processing in less than max.poll.interval.ms, 1. and 2. keep repeating in a loop.

To fix the problem, you can increase the max.poll.interval.ms to a large enough value based on how much time your consumer needs for processing. Then your consumer will not get marked as dead and will not receive duplicate messages. However, the real fix is to check your processing logic and try to reduce the processing time.

like image 194
Vikram Rawat Avatar answered Oct 20 '22 00:10

Vikram Rawat


The fix is described in the message you pasted:

You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

The reason is a timeout is reached before your consumer is able to process and commit the message. When your Kafka consumer "commits", it's basically acknowledging receipt of the previous message, advancing the offset, and therefore moving onto the next message. But if that timeout is passed (as is the case for you), the consumer's commit isn't effective because it's happening too late; then the next time the consumer asks for a message, it's given the same message

Some of your options are to:

  1. Increase session.timeout.ms=30000, so the consumer has more time process the messages
  2. Decrease the max.poll.records=20 so the consumer has less messages it'll need to work on before the timeout occurs. But this doesn't really apply to you because your consumer is already only just working on a single message
  3. Or turn on enable.auto.commit, which probably also isn't the best solution for you because it might result in dropping messages though, as mentioned below:

    If we allowed offsets to auto commit as in the previous example messages would be considered consumed after they were given out by the consumer, and it would be possible that our process could fail after we have read messages into our in-memory buffer but before they had been inserted into the database. Source: https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

like image 26
gunit Avatar answered Oct 20 '22 01:10

gunit