Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Configure kafka-net to stop sending latest messages

I'm using kafka 0.8.1.1 on a Red Hat VM with kafka-net plugin. How can I configure my consumer to stop receiving earlier messages from kafka?

My consumer code:

var options = new KafkaOptions(new Uri("tcp://199.53.249.150:9092"), new Uri("tcp://199.53.249.151:9092"));

Stopwatch sp = new Stopwatch();
var router = new BrokerRouter(options);
var consumer = new Consumer(new ConsumerOptions("Test", router));

ThreadStart start2 = () =>
{
    while (true)
    {
        sp.Start();
        foreach (var message in consumer.Consume())
        {
            if (MessageDecoderReceiver.MessageBase(message.Value) != null)
            {
                PrintMessage(MessageDecoderReceiver.MessageBase(message.Value).ToString());
            }
            else
            {
                Console.WriteLine(message.Value);
            }
        }
        sp.Stop();
    }
};
var thread2 = new Thread(start2);
thread2.Start();
like image 947
Lucas Gazire Avatar asked Mar 13 '15 21:03

Lucas Gazire


People also ask

How do you slow down a consumer Kafka?

The best thing to do in Kafka Streams (and also when using a plain KafkaConsumer) is to throttle calls to poll() manually. For Kafka Streams, you can add a Thread. sleep() into any UDF.

What is auto offset reset in Kafka?

Second, use auto. offset. reset to define the behavior of the consumer when there is no committed position (which would be the case when the group is first initialized) or when an offset is out of range. You can choose either to reset the position to the “earliest” offset or the “latest” offset (the default).

What is Autocommit in Kafka?

Auto commit is enabled out of the box and by default commits every five seconds. For a simple data transformation service, “processed” means, simply, that a message has come in and been transformed and then produced back to Kafka.

How do I stop consuming messages from Kafka topic?

Stopping a Kafka Consumer We can use rest api to stop a running consumer. However, we need consumer id to stop the running consumer, so the consumer id needs to be sent. Then try to access the POST http://localhost:8080/api/kafka/registry/deactivate by sending the id parameter of the consumer you want to stop.


1 Answers

The Consumer in Kafka-net does not currently auto track the offsets being consumed. You will have to implement the offset tracking manually.

To Store the offset in kafka version 0.8.1:

 var commit = new OffsetCommitRequest
            {
                ConsumerGroup = consumerGroup,
                OffsetCommits = new List<OffsetCommit>
                            {
                                new OffsetCommit
                                    {
                                        PartitionId = partitionId,
                                        Topic = IntegrationConfig.IntegrationTopic,
                                        Offset = offset,
                                        Metadata = metadata
                                    }
                            }
            };

var commitResponse = conn.Connection.SendAsync(commit).Result.FirstOrDefault();

To set the consumer to start importing at a specific offset point:

var offsets = consumer.GetTopicOffsetAsync(IntegrationConfig.IntegrationTopic).Result
                    .Select(x => new OffsetPosition(x.PartitionId, x.Offsets.Max())).ToArray();

var consumer = new Consumer(new ConsumerOptions(IntegrationConfig.IntegrationTopic, router), offsets);

Note the above code will set the consumer to start consuming at the very end of the log, effectively only receiving new messages.

like image 187
James Roland Avatar answered Oct 12 '22 23:10

James Roland