I have a Kafka consumer that I create on a schedule. It attempts to consume all of the new messages that have been added since the last commit was made.
I would like to shut the consumer down once it consumes all of the new messages in the log instead of waiting indefinitely for new messages to come in.
I'm having trouble finding a solution via Kafka's documentation.
I see a number of timeout related properties available in the Confluent.Kafka.ConsumerConfig and ClientConfig classes, including FetchWaitMaxMs, but am unable to decipher which to use. I'm using the .NET client.
Any advice would be appreciated.
Unfortunately, the very thing that makes Kafka so powerful (its ability to retain messages) can also make it difficult to work with. Sometimes you need to restore a topic to a clean state, and it's not always clear how to do that. In this article, we'll look at several options for deleting messages from a topic.
So you would do 9 requests, asking for 1–10, 11–20, etc., until the 100 are reached. In Kafka’s case, there could be 1 million messages between successive requests, so a user can never see the “latest” message, only the range as requested by the browser.
When consumer consumes a message, it commits its offset to Kafka. Committing the message offset makes next message to be returned when poll() is called again. If consumer does not commit the message, every call to poll() will always return the same message.
If the streamlistener gets topic, SWITCH_ON it shall stop the two KafkaListeners in the below using kafkaListenerEndpointRegisrty.stop (). If the streamlistener gets topic, SWITCH_Off it shall start the two KafkaListeners in the below using kafkaListenerEndpointRegisrty.start ().
I have found a solution. Version 1.0.0-beta2 of Confluent's .NET Kafka library provides a method called .Consume(TimeSpan timeSpan)
. This will return null if there are no new messages to consume or if we're at the partition EOF. I was previously using the .Consume(CancellationToken cancellationToken)
overload which was blocking and preventing me from shutting down the consumer. More here: https://github.com/confluentinc/confluent-kafka-dotnet/issues/614#issuecomment-433848857
Another option was to upgrade to version 1.0.0-beta3 which provides a boolean flag on the ConsumeResult object called IsPartitionEOF. This is what I was initially looking for - a way to know when I've reached the end of the partition.
I have never used the .NET client, but assuming it cannot be all that different from the Java client, the poll()
method should accept a timeout value in milliseconds, so setting that to 5000
should work in most cases. No need to fiddle with config classes.
Another approach is to find the maximum offset at the time that your consumer is created, and only read up until that offset. This would theoretically prevent your consumer from running indefinitely if, by any chance, it is not consuming as fast as producers produce. But I have never tried that approach.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With