Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Return from Kafka consumer when there is no message

I want to process a topic in application startup using Confluent dotnet client. Assume following example:

    while (true)
    {
        try
        {
            var cr = c.Consume();
            Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
        }
        catch (ConsumeException e)
        {
            Console.WriteLine($"Error occured: {e.Error.Reason}");
        }
    }

When there is no new message in Kafka, c.Consume will be blocked. Because I want to use it for application startup (Like cache warm up) I want to proceed my code when I found there is no new message.

I know there is an overload for setting timeout like c.Consume(timeout) but the problem with this approach is that if you have a message in your topic and the time duration of reading the message was more than your timeout, You receive null output which is not desirable.

like image 835
Seyed Morteza Mousavi Avatar asked Sep 17 '25 08:09

Seyed Morteza Mousavi


2 Answers

The consumer(s) is not supposed to be aware of the producer(s).

Now if you want to know that you have read everything in the topic from the moment you start to consume, you can:

  1. Load the newest offset before starting to consume.
  2. Then start consuming messages.
  3. If the message's offset is the same as the newest offset you loaded before, stop consuming.

I'm not a C# developper but from what I read in the dotnet confluent doc you can call QueryWatermarkOffsetson the consumer to get oldest and newest offset. https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Consumer.html#Confluent_Kafka_Consumer_QueryWatermarkOffsets_Confluent_Kafka_TopicPartition_

And then, on the Messageclass you have an Offset accessor. So the whole thing should not be too hard to achieve. https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Message.html#Confluent_Kafka_Message_Offset

like image 133
Francois Avatar answered Sep 21 '25 03:09

Francois


You can use OnPartitionEOF event that indicates you have reached the end of partition.

CancellationTokenSource source = new CancellationTokenSource();
bool isContinue = true;

c.OnPartitionEOF += (o, e) =>
    {
        Console.WriteLine($"You have reached end of partition");
        isContinue = false;
        source.Cancel();
    };    
while (isContinue)
{
    try
    {
        var cr = c.Consume(source.Token);
        Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
    }
    catch (ConsumeException e)
    {
        Console.WriteLine($"Error occured: {e.Error.Reason}");
    }
}
like image 40
Amin Avatar answered Sep 21 '25 04:09

Amin