I use Confluent Kafka .Net library Version 1.2.1, I have implemented consumer to consume messages from a topic, the problem is Consume method blocks the main thread and keeps waiting until a message is posted, but I would like to make it non blocking or to run in parallel. could some one help me on this?
below is the code I'm using for Consumer
using (var consumer = new ConsumerBuilder<Ignore, string>(config)
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.SetPartitionsAssignedHandler((c, partitions) =>
{
Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]");
})
.SetPartitionsRevokedHandler((c, partitions) =>
{
Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]");
})
.Build())
{
consumer.Subscribe(topicName);
while (true)
{
try
{
var consumeResult = consumer.Consume(cancellationToken);
if (consumeResult!=null)
{
if (consumeResult.IsPartitionEOF)
{
Console.WriteLine($"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
continue;
}
Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Value}");
Console.WriteLine($"Received message => {consumeResult.Value}");
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error.Reason}");
}
}
}
I couldn't find an Async method for consumer but Producer has ProduceAsync to serve the purpose.
There's no async consume (yet). You might not be aware that calling consume simply returns messages from an internal queue - doesn't correspond to a network fetch to a Kafka broker. Messages are fetched in background threads, and caching is very aggressive by default (tunable via parameters outlined here: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). If there are messages available, the consumer can deliver them to your application at > 500,000 msgs / second (for small message sizes). To achieve parallelism, use multiple consumers (generally in separate processes, and note it's best to try to fully utilize each consumer for maximum efficiency), or use multiple threads after consuming a message.
With the above said, an async consume method would be useful in cases where you want to block on multiple types of IO simultaneously (e.g. kafka consume and http responses). It would also be useful to fit in with standard C# patterns, in particular hosted services. For these reasons, we'll like to add it in the future.
For now, for the first use case, you can just use another thread (an extra thread that isn't busy isn't going to materially affect performance):
Task t = Task.Run(() => consumer.Consume(ct);
For the second use case, just set up a long running thread instead.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With