I need my consumer to consume from an specific TopicPartitionOffset(here from offset 278)
. Suppose that Messages have been produced by some Producer in Specific topic like ="Test_1"
before.
Here is my Code
using System;
using Confluent.Kafka;
public class ConsumingTest
{
public static void Main(string[] args)
{
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "localhost:9092", EnableAutoCommit = false, GroupId = "this-group-id"
};
using (var consumer = new Consumer<Null, string>(consumerConfig))
{
Console.WriteLine("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~Consume Started...");
consumer.Subscribe("Test_1");
var topicPartitionOffset = new TopicPartitionOffset("Test_1", new Partition(0), new Offset(278));
consumer.Assign(topicPartitionOffset);
consumer.Seek(topicPartitionOffset);
while (true)
try
{
var cr = consumer.Consume();
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine(e.Message);
}
}
}
}
at line ----> var cr = consumer.Consume();
Consumer Consumes but nothing Happens. What is the problem.
I have already done AutoOffsetReset = AutoOffsetResetType.Earliest
in ConsumerConfig , and Consumer Consumes All messages from all offsets but, this is not what I'm looking for.
Method Summary Manually assign a list of partition to this consumer. Get the set of partitions currently assigned to this consumer. Close the consumer, waiting indefinitely for any needed cleanup. Commit offsets returned on the last poll() for all the subscribed list of topics and partition.
Instead you can consider using Kafka Streams application where you can read the data as a stream and filter the events where flow_status = "completed" and publish in some output topic or some other destination. Show activity on this post.
A configuration container to represent a topic name, partition number and, optionally, an offset for it.
Console.WriteLine ($"Consumed message ' {cr.Value}' at: ' {cr.TopicPartitionOffset}'."); TopicPartitionOffset represents a Kafka detail on Topic, Partition, and Offset details. Kafka Consumer example.
Consume items from the “my_topic” topic and press “Ctrl+C” to exit. - Manage Kafka topics. Confluent Cloud is a fully-managed Apache Kafka service available on all three major clouds. Try it free today.
It’s simple to use the .NET Client application consuming messages from an Apache Kafka. Confluent Kafka is a lightweight wrapper around librdkafka t hat provides an easy interface for Consumer Client consuming the Kafka Topic messages by subscribing to the Topic and polling the message/event as required.
Kafka Consumer example. In the above example, we are consuming 100 messages from the Kafka topics which we produced using the Producer example we learned in the previous article. We had published messages with incremental values “Test1”, “Test2″…. and so on and here we are consuming them in the same order to keep the message flow simple here.
Solved: I found the solution which described as below:
consumer.Assign(new TopicPartitionOffset(topicName, 0, new Offset(lastConsumedOffset)))
Before trying to Consume, and
consumer.Subscribe("Test_1")
and consumer.Seek(...)
So Updated code is something like this which perfectly works:
using (var consumer = new Consumer<Ignore, string>(config))
{
consumer.Assign(topicName, 0, new Offset(lastConsumedOffset));
while (true)
{
try
{
var consumeResult = consumer.Consume();
Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: ${consumeResult.Value}");
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error}");
}
}
consumer.Close();
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With