Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to Consume from specific TopicPartitionOffset with Confluent.Kafka in .Net

I need my consumer to consume from an specific TopicPartitionOffset(here from offset 278). Suppose that Messages have been produced by some Producer in Specific topic like ="Test_1" before. Here is my Code

using System;
using Confluent.Kafka;

public class ConsumingTest
{
    public static void Main(string[] args)
    {
        var consumerConfig = new ConsumerConfig
                                 {
                                     BootstrapServers = "localhost:9092", EnableAutoCommit = false, GroupId = "this-group-id"
                                 };

        using (var consumer = new Consumer<Null, string>(consumerConfig))
        {
            Console.WriteLine("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~Consume Started...");
            consumer.Subscribe("Test_1");

            var topicPartitionOffset = new TopicPartitionOffset("Test_1", new Partition(0), new Offset(278));

            consumer.Assign(topicPartitionOffset);
            consumer.Seek(topicPartitionOffset);

            while (true)
                try
                {
                    var cr = consumer.Consume();

                    Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                }
                catch (ConsumeException e)
                {
                    Console.WriteLine(e.Message);
                }
        }
    }
}

at line ----> var cr = consumer.Consume(); Consumer Consumes but nothing Happens. What is the problem.

I have already done AutoOffsetReset = AutoOffsetResetType.Earliest in ConsumerConfig , and Consumer Consumes All messages from all offsets but, this is not what I'm looking for.

like image 833
Hassan Ahmadi Avatar asked Dec 29 '18 10:12

Hassan Ahmadi


People also ask

How do you manually commit specific offset in Kafka?

Method Summary Manually assign a list of partition to this consumer. Get the set of partitions currently assigned to this consumer. Close the consumer, waiting indefinitely for any needed cleanup. Commit offsets returned on the last poll() for all the subscribed list of topics and partition.

How do I fetch specific messages in Apache Kafka?

Instead you can consider using Kafka Streams application where you can read the data as a stream and filter the events where flow_status = "completed" and publish in some output topic or some other destination. Show activity on this post.

What is Topicpartitionoffset?

A configuration container to represent a topic name, partition number and, optionally, an offset for it.

What is topicpartitionoffset in Kafka consumer?

Console.WriteLine ($"Consumed message ' {cr.Value}' at: ' {cr.TopicPartitionOffset}'."); TopicPartitionOffset represents a Kafka detail on Topic, Partition, and Offset details. Kafka Consumer example.

How do I exit a topic in Kafka?

Consume items from the “my_topic” topic and press “Ctrl+C” to exit. - Manage Kafka topics. Confluent Cloud is a fully-managed Apache Kafka service available on all three major clouds. Try it free today.

How to consume messages from Apache Kafka using a client application?

It’s simple to use the .NET Client application consuming messages from an Apache Kafka. Confluent Kafka is a lightweight wrapper around librdkafka t hat provides an easy interface for Consumer Client consuming the Kafka Topic messages by subscribing to the Topic and polling the message/event as required.

What is Kafka consumer example?

Kafka Consumer example. In the above example, we are consuming 100 messages from the Kafka topics which we produced using the Producer example we learned in the previous article. We had published messages with incremental values “Test1”, “Test2″…. and so on and here we are consuming them in the same order to keep the message flow simple here.


Video Answer


1 Answers

Solved: I found the solution which described as below:

  • added this

consumer.Assign(new TopicPartitionOffset(topicName, 0, new Offset(lastConsumedOffset))) Before trying to Consume, and

  • Removed these

consumer.Subscribe("Test_1") and consumer.Seek(...)

So Updated code is something like this which perfectly works:

using (var consumer = new Consumer<Ignore, string>(config))
            {
                consumer.Assign(topicName, 0, new Offset(lastConsumedOffset));
                while (true)
                {
                    try
                    {
                        var consumeResult = consumer.Consume();
                        Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: ${consumeResult.Value}");
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Consume error: {e.Error}");
                    }
                }

                consumer.Close();
            }
like image 67
Hassan Ahmadi Avatar answered Oct 24 '22 18:10

Hassan Ahmadi