When starting up a confluent-dotnet consumer , after the call to subscribe and subsequent polling, it seems to take a very long time to receive the "Partition assigned" event from the server, and therefore messages (about 10-15sec).
At first I thought there was a auto topic creation overhead, but the time is the same whether the topic/consumer group of the consumer already exist or not.
I start my consumer with this config, the rest of the code is the same as in the confluent advanced consumer example :
var kafkaConfig = new Dictionary<string, object>
{
{"group.id", config.ConsumerGroup},
{"statistics.interval.ms", 60000},
{"fetch.wait.max.ms", 10},
{"bootstrap.servers", config.BrokerList},
{"enable.auto.commit", config.AutoCommit},
{"socket.blocking.max.ms",1},
{"fetch.error.backoff.ms",1 },
{"socket.nagle.disable",true },
{"auto.commit.interval.ms", 5000},
{
"default.topic.config", new Dictionary<string, object>()
{
{"auto.offset.reset", "smallest"}
}
}
};
The kafka cluster consists of 3 low-mid spec machines in a remote datacenter with default settings. Is there a broker or client setting that can be tweaked to lower this startup time?
EDIT: assigning partitions myself with Assign instead of Subscribe results in startup time of around 2sec instead
How about increase the partitions of topic and also increase consumers up to partitions. Consuming concurrency can increase performance. If you store offsets on the zookeeper, it can be bottleneck. Reduce commits of offset and use dedicated zookeeper if possible.
To add the delay, all we need to do is use Thread. sleep(<time in ms>). This makes the thread, which runs the Kafka listener, to sleep and stop consuming any more messages till the time passes. The message can then be pushed back into the main processing queue.
The reasons for that may vary from inefficient processing to issues with the Data Store, network issues, and many more. Basically, anything that can slow down consuming data from the Kafka Broker will cause the Consumer Lag making the processing fall behind in processing the data.
With Kafka's partition order that is not possible (you can only set the same delay for the entire partition). This feature is usually abused in other technologies to do synchronize-by-sleep which is a horrible synchronization method.
Kafka Consumers work in groups by design - the delay you see is the group co-ordinator (which resides on the cluster, not the client side) waiting for any existing/previous session(s) to timeout and to allow any additional consumers in the same group to start before allocating partitions to all the consumers with an active connection.
In fact, if you re-start your test consumer quickly enough, you'll see that delay jump to almost 30 seconds because session.timeout.ms
has a default value of 30000 and the cluster still hasn't "noticed" that the previous consumer has gone until this timeout kicks in. Also if you change group.id
between restarts you'll see the delay drop drastically as the cluster won't wait on existing consumers that are part of a different group.
Finally, try cleanly exiting your consumer before firing up again (call Unsubscribe() and make sure the Consumer is disposed).
It appears that session.timeout.ms
can be lowered to 6000 to reduce the timeout of any existing consumer group connection, but not lower.
Even with everything starting "clean" it appears you'll still get a delay of up to 7 seconds (I'm guessing standard connection setup plus waiting for any other consumers in the same group to start). If you use Assign() instead of Subscribe() then you are choosing to assign the partitions to your consumer(s) yourself and automatic group balancing doesn't apply.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With