Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I stop attempting to consume messages off of Kafka when at the end of the log?

I have a Kafka consumer that I create on a schedule. It attempts to consume all of the new messages that have been added since the last commit was made.

I would like to shut the consumer down once it consumes all of the new messages in the log instead of waiting indefinitely for new messages to come in.

I'm having trouble finding a solution via Kafka's documentation.

I see a number of timeout related properties available in the Confluent.Kafka.ConsumerConfig and ClientConfig classes, including FetchWaitMaxMs, but am unable to decipher which to use. I'm using the .NET client.

Any advice would be appreciated.

like image 316
Matt Schley Avatar asked Feb 14 '19 19:02

Matt Schley


People also ask

Can I delete messages from a topic in Kafka?

Unfortunately, the very thing that makes Kafka so powerful (its ability to retain messages) can also make it difficult to work with. Sometimes you need to restore a topic to a clean state, and it's not always clear how to do that. In this article, we'll look at several options for deleting messages from a topic.

How many requests can Kafka handle at a time?

So you would do 9 requests, asking for 1–10, 11–20, etc., until the 100 are reached. In Kafka’s case, there could be 1 million messages between successive requests, so a user can never see the “latest” message, only the range as requested by the browser.

What happens when consumer consumes a message in Kafka?

When consumer consumes a message, it commits its offset to Kafka. Committing the message offset makes next message to be returned when poll() is called again. If consumer does not commit the message, every call to poll() will always return the same message.

How to start and stop two Kafka listeners at once?

If the streamlistener gets topic, SWITCH_ON it shall stop the two KafkaListeners in the below using kafkaListenerEndpointRegisrty.stop (). If the streamlistener gets topic, SWITCH_Off it shall start the two KafkaListeners in the below using kafkaListenerEndpointRegisrty.start ().


2 Answers

I have found a solution. Version 1.0.0-beta2 of Confluent's .NET Kafka library provides a method called .Consume(TimeSpan timeSpan). This will return null if there are no new messages to consume or if we're at the partition EOF. I was previously using the .Consume(CancellationToken cancellationToken) overload which was blocking and preventing me from shutting down the consumer. More here: https://github.com/confluentinc/confluent-kafka-dotnet/issues/614#issuecomment-433848857

Another option was to upgrade to version 1.0.0-beta3 which provides a boolean flag on the ConsumeResult object called IsPartitionEOF. This is what I was initially looking for - a way to know when I've reached the end of the partition.

like image 153
Matt Schley Avatar answered Oct 17 '22 20:10

Matt Schley


I have never used the .NET client, but assuming it cannot be all that different from the Java client, the poll() method should accept a timeout value in milliseconds, so setting that to 5000 should work in most cases. No need to fiddle with config classes.

Another approach is to find the maximum offset at the time that your consumer is created, and only read up until that offset. This would theoretically prevent your consumer from running indefinitely if, by any chance, it is not consuming as fast as producers produce. But I have never tried that approach.

like image 20
Mike Nakis Avatar answered Oct 17 '22 19:10

Mike Nakis