Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to make consume method as non blocking in confluent kafka for dot net

I use Confluent Kafka .Net library Version 1.2.1, I have implemented consumer to consume messages from a topic, the problem is Consume method blocks the main thread and keeps waiting until a message is posted, but I would like to make it non blocking or to run in parallel. could some one help me on this?

below is the code I'm using for Consumer

using (var consumer = new ConsumerBuilder<Ignore, string>(config) 
    .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
    .SetPartitionsAssignedHandler((c, partitions) =>
    {
        Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]");        
    })
    .SetPartitionsRevokedHandler((c, partitions) =>
    {
        Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]");
    })
    .Build())
{
    consumer.Subscribe(topicName);

    while (true)
    {
        try
        {
            var consumeResult = consumer.Consume(cancellationToken);

            if (consumeResult!=null)
            {
                if (consumeResult.IsPartitionEOF)
                {
                    Console.WriteLine($"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");

                    continue;
                }

                Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Value}");

                Console.WriteLine($"Received message => {consumeResult.Value}");

            }
        }

        catch (ConsumeException e)
        {
            Console.WriteLine($"Consume error: {e.Error.Reason}");
        }
    }
}

I couldn't find an Async method for consumer but Producer has ProduceAsync to serve the purpose.

like image 830
Annamalai D Avatar asked Jan 25 '23 12:01

Annamalai D


1 Answers

There's no async consume (yet). You might not be aware that calling consume simply returns messages from an internal queue - doesn't correspond to a network fetch to a Kafka broker. Messages are fetched in background threads, and caching is very aggressive by default (tunable via parameters outlined here: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). If there are messages available, the consumer can deliver them to your application at > 500,000 msgs / second (for small message sizes). To achieve parallelism, use multiple consumers (generally in separate processes, and note it's best to try to fully utilize each consumer for maximum efficiency), or use multiple threads after consuming a message.

With the above said, an async consume method would be useful in cases where you want to block on multiple types of IO simultaneously (e.g. kafka consume and http responses). It would also be useful to fit in with standard C# patterns, in particular hosted services. For these reasons, we'll like to add it in the future.

For now, for the first use case, you can just use another thread (an extra thread that isn't busy isn't going to materially affect performance):

Task t = Task.Run(() => consumer.Consume(ct);

For the second use case, just set up a long running thread instead.

like image 105
Matt Howlett Avatar answered Jan 28 '23 14:01

Matt Howlett