I am using Confluent.Kafka .NET client version 1.3.0. I am following the docs:
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "server1, server2",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = true,
EnableAutoOffsetStore = false,
GroupId = this.groupId,
SecurityProtocol = SecurityProtocol.SaslPlaintext,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = this.kafkaUsername,
SaslPassword = this.kafkaPassword,
};
using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
var cancellationToken = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cancellationToken.Cancel();
};
consumer.Subscribe("my-topic");
while (true)
{
try
{
var consumerResult = consumer.Consume();
// process message
consumer.StoreOffset(consumerResult);
}
catch (ConsumeException e)
{
// log
}
catch (KafkaException e)
{
// log
}
catch (OperationCanceledException e)
{
// log
}
}
}
The problem is that even if I comment out the line consumer.StoreOffset(consumerResult);
, I keep getting the next unconsumed message the next time I Consume, i.e. the offset keeps increasing which doesn't seem to be what the documentation claims it does, i.e. at least one delivery.
Even if I set EnableAutoCommit = false
and remove 'EnableAutoOffsetStore = false' from the config, and replace consumer.StoreOffset(consumerResult)
with consumer.Commit()
, I still see the same behavior, i.e. even if I comment out the Commit
, I still keep getting the next unconsumed messages.
I feel like I am missing something fundamental here, but can't figure what. Any help is appreciated!
So the rule in Kafka is only one consumer in a consumer group can be assigned to consume messages from a partition in a topic and hence multiple Kafka consumers from a consumer group can not read the same message from a partition.
The Kafka cluster retains all published messages—whether or not they have been consumed—for a configurable period of time. For example if the log retention is set to two days, then for the two days after a message is published it is available for consumption, after which it will be discarded to free up space.
If a consumer fails before a commit, all messages after the last commit are received from Kafka and processed again. However, this retry might result in duplicates, as some message from the last poll() call might have been processed but the failure happened right before the auto commit call.
You may want to have a re-try logic for processing each of your messages for a fixed number of times like say 5. If it doesn't succeed during these 5 retries, you may want to add this message to another topic for handling all failed messages which take precedence over your actual topic. Or you may want to add the failed message to the same topic so that it will be picked up later once all those other messages are consumed.
If the processing of any message is successful within those 5 retries, you can skip to the next message in the queue.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With