Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does a Kafka consumer take a long time to start consuming?

We start a Kafka consumer, listening on a topic which may not yet be created (topic auto creation is enabled though).

Not long thereafter a producer is publishing messages on that topic.

However, it takes some time for the consumer to notice this: 5 minutes to be exact. At this point the consumer revokes its partitions and rejoins the consumer group. Kafka re-stabilizes the group. Looking at the time-stamps of the consumer vs. kafka logs, this process is initiated at the consumer side.

I suppose this is expected behavior but I would like to understand this. Is this actually a re-balancing going on (from 0 to 1 partition)? If we'd create topics upfront, would this not happen?

2017-02-01 08:36:45.692  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group tps-kafka-partitioning
2017-02-01 08:36:45.692  INFO 7 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2017-02-01 08:36:45.693  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group tps-kafka-partitioning
2017-02-01 08:36:45.738  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group tps-kafka-partitioning with generation 1
2017-02-01 08:36:45.747  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [] for group tps-kafka-partitioning
2017-02-01 08:36:45.749  INFO 7 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[]
2017-02-01 08:41:45.540  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group tps-kafka-partitioning
2017-02-01 08:41:45.544  INFO 7 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2017-02-01 08:41:45.544  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group tps-kafka-partitioning

kafka logs

[2017-02-01 08:41:45,546] INFO [GroupCoordinator 1001]: Preparing to restabilize group tps-kafka-partitioning with old generation 1 (kafka.coordinator.GroupCoordinator)
[2017-02-01 08:41:45,546] INFO [GroupCoordinator 1001]: Stabilized group tps-kafka-partitioning generation 2 (kafka.coordinator.GroupCoordinator)
[2017-02-01 08:41:45,551] INFO [GroupCoordinator 1001]: Assignment received from leader for group tps-kafka-partitioning for generation 2 (kafka.coordinator.GroupCoordinator)
[2017-02-01 08:42:14,636] INFO [GroupCoordinator 1001]: Preparing to restabilize group tps-kafka-group-id with old generation 1 (kafka.coordinator.GroupCoordinator)
[2017-02-01 08:42:14,636] INFO [GroupCoordinator 1001]: Stabilized group tps-kafka-group-id generation 2 (kafka.coordinator.GroupCoordinator)
like image 857
Raf Avatar asked Feb 01 '17 09:02

Raf


People also ask

Why is consumer slow Kafka?

Kafka ConsumersIf there are way too many producers writing data to the same topic when there are a limited number of consumers then then the reading processes will always be slow. The real time objectives are lost.

How can I make Kafka consumer faster?

Increasing the number of partitions and the number of brokers in a cluster will lead to increased parallelism of message consumption, which in turn improves the throughput of a Kafka cluster; however, the time required to replicate data across replica sets will also increase.

How do I clear Kafka consumer lag?

In order to "fast forward" the offset of consumer group, means to clear the LAG, you need to create new consumer that will join the same group. In parallel you can run the command to see the lags like you described, and you will see the lag wiped.

How do you know when a Kafka consumer is ready?

You can use consumer. assignment() , it will return set of partitions and verify whether all of the partitions are assigned which are available for that topic.


1 Answers

This is probably due to the default value of the parameter metadata.max.age.ms which controls how often the consumer forces a refresh of metadata for a topic.

What happens when you start the consumer up with a non existing topic is that the brokers autocreate this topic, but this takes a little bit of time with leader election etc., so when your consumer requests metadata for that topic it gets a LEADER_NOT_AVAILABLE warning and can't fetch any messages. After the timeout mentioned above is reached the consumer refreshes metadata, successfully this time around and starts reading messages. This is not dependent on a producer writing messages to the topic, it is purely a consumer thing.

If you start your consumer with for example 1000ms timeout, you should see a much shorter delay until messages are consumed.

Also, if you create topics up front, or start the producer before the consumer, this behavior should not happen at all.

like image 113
Sönke Liebau Avatar answered Oct 22 '22 21:10

Sönke Liebau