Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka consumer startup delay confluent dotnet

When starting up a confluent-dotnet consumer , after the call to subscribe and subsequent polling, it seems to take a very long time to receive the "Partition assigned" event from the server, and therefore messages (about 10-15sec).

At first I thought there was a auto topic creation overhead, but the time is the same whether the topic/consumer group of the consumer already exist or not.

I start my consumer with this config, the rest of the code is the same as in the confluent advanced consumer example :

            var kafkaConfig = new Dictionary<string, object>
        {
            {"group.id", config.ConsumerGroup},
            {"statistics.interval.ms", 60000},
            {"fetch.wait.max.ms", 10},
            {"bootstrap.servers", config.BrokerList},
            {"enable.auto.commit", config.AutoCommit},
            {"socket.blocking.max.ms",1},
            {"fetch.error.backoff.ms",1 },
            {"socket.nagle.disable",true },
            {"auto.commit.interval.ms", 5000},

            {
                "default.topic.config", new Dictionary<string, object>()
                {
                    {"auto.offset.reset", "smallest"}
                }
            }
        };

The kafka cluster consists of 3 low-mid spec machines in a remote datacenter with default settings. Is there a broker or client setting that can be tweaked to lower this startup time?

EDIT: assigning partitions myself with Assign instead of Subscribe results in startup time of around 2sec instead

like image 542
JJ15k Avatar asked Jan 12 '18 10:01

JJ15k


People also ask

How do I fix consumer lag in Kafka?

How about increase the partitions of topic and also increase consumers up to partitions. Consuming concurrency can increase performance. If you store offsets on the zookeeper, it can be bottleneck. Reduce commits of offset and use dedicated zookeeper if possible.

How do you introduce a delay in Kafka consumer?

To add the delay, all we need to do is use Thread. sleep(<time in ms>). This makes the thread, which runs the Kafka listener, to sleep and stop consuming any more messages till the time passes. The message can then be pushed back into the main processing queue.

What causes Kafka consumer lag?

The reasons for that may vary from inefficient processing to issues with the Data Store, network issues, and many more. Basically, anything that can slow down consuming data from the Kafka Broker will cause the Consumer Lag making the processing fall behind in processing the data.

Can we add delay in Kafka?

With Kafka's partition order that is not possible (you can only set the same delay for the entire partition). This feature is usually abused in other technologies to do synchronize-by-sleep which is a horrible synchronization method.


1 Answers

Kafka Consumers work in groups by design - the delay you see is the group co-ordinator (which resides on the cluster, not the client side) waiting for any existing/previous session(s) to timeout and to allow any additional consumers in the same group to start before allocating partitions to all the consumers with an active connection.

In fact, if you re-start your test consumer quickly enough, you'll see that delay jump to almost 30 seconds because session.timeout.ms has a default value of 30000 and the cluster still hasn't "noticed" that the previous consumer has gone until this timeout kicks in. Also if you change group.id between restarts you'll see the delay drop drastically as the cluster won't wait on existing consumers that are part of a different group.

Finally, try cleanly exiting your consumer before firing up again (call Unsubscribe() and make sure the Consumer is disposed).

It appears that session.timeout.ms can be lowered to 6000 to reduce the timeout of any existing consumer group connection, but not lower.

Even with everything starting "clean" it appears you'll still get a delay of up to 7 seconds (I'm guessing standard connection setup plus waiting for any other consumers in the same group to start). If you use Assign() instead of Subscribe() then you are choosing to assign the partitions to your consumer(s) yourself and automatic group balancing doesn't apply.

like image 113
zeroid Avatar answered Oct 25 '22 02:10

zeroid