I'm using kafka 0.8.1.1 on a Red Hat VM with kafka-net plugin. How can I configure my consumer to stop receiving earlier messages from kafka?
My consumer code:
var options = new KafkaOptions(new Uri("tcp://199.53.249.150:9092"), new Uri("tcp://199.53.249.151:9092"));
Stopwatch sp = new Stopwatch();
var router = new BrokerRouter(options);
var consumer = new Consumer(new ConsumerOptions("Test", router));
ThreadStart start2 = () =>
{
while (true)
{
sp.Start();
foreach (var message in consumer.Consume())
{
if (MessageDecoderReceiver.MessageBase(message.Value) != null)
{
PrintMessage(MessageDecoderReceiver.MessageBase(message.Value).ToString());
}
else
{
Console.WriteLine(message.Value);
}
}
sp.Stop();
}
};
var thread2 = new Thread(start2);
thread2.Start();
The best thing to do in Kafka Streams (and also when using a plain KafkaConsumer) is to throttle calls to poll() manually. For Kafka Streams, you can add a Thread. sleep() into any UDF.
Second, use auto. offset. reset to define the behavior of the consumer when there is no committed position (which would be the case when the group is first initialized) or when an offset is out of range. You can choose either to reset the position to the “earliest” offset or the “latest” offset (the default).
Auto commit is enabled out of the box and by default commits every five seconds. For a simple data transformation service, “processed” means, simply, that a message has come in and been transformed and then produced back to Kafka.
Stopping a Kafka Consumer We can use rest api to stop a running consumer. However, we need consumer id to stop the running consumer, so the consumer id needs to be sent. Then try to access the POST http://localhost:8080/api/kafka/registry/deactivate by sending the id parameter of the consumer you want to stop.
The Consumer in Kafka-net does not currently auto track the offsets being consumed. You will have to implement the offset tracking manually.
To Store the offset in kafka version 0.8.1:
var commit = new OffsetCommitRequest
{
ConsumerGroup = consumerGroup,
OffsetCommits = new List<OffsetCommit>
{
new OffsetCommit
{
PartitionId = partitionId,
Topic = IntegrationConfig.IntegrationTopic,
Offset = offset,
Metadata = metadata
}
}
};
var commitResponse = conn.Connection.SendAsync(commit).Result.FirstOrDefault();
To set the consumer to start importing at a specific offset point:
var offsets = consumer.GetTopicOffsetAsync(IntegrationConfig.IntegrationTopic).Result
.Select(x => new OffsetPosition(x.PartitionId, x.Offsets.Max())).ToArray();
var consumer = new Consumer(new ConsumerOptions(IntegrationConfig.IntegrationTopic, router), offsets);
Note the above code will set the consumer to start consuming at the very end of the log, effectively only receiving new messages.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With