Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Consume the same message again if processing of the message fails

I am using Confluent.Kafka .NET client version 1.3.0. I am following the docs:

var consumerConfig = new ConsumerConfig
{
    BootstrapServers = "server1, server2",
    AutoOffsetReset = AutoOffsetReset.Earliest,
    EnableAutoCommit = true,
    EnableAutoOffsetStore = false,
    GroupId = this.groupId,
    SecurityProtocol = SecurityProtocol.SaslPlaintext,
    SaslMechanism = SaslMechanism.Plain,
    SaslUsername = this.kafkaUsername,
    SaslPassword = this.kafkaPassword,
};

using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
    var cancellationToken = new CancellationTokenSource();
    Console.CancelKeyPress += (_, e) =>
    {
        e.Cancel = true;
        cancellationToken.Cancel();
    };

    consumer.Subscribe("my-topic");
    while (true)
    {
        try
        {
            var consumerResult = consumer.Consume();
            // process message
            consumer.StoreOffset(consumerResult);
        }
        catch (ConsumeException e)
        {
            // log
        }
        catch (KafkaException e)
        {
            // log
        }
        catch (OperationCanceledException e)
        {
            // log
        }
    }
}

The problem is that even if I comment out the line consumer.StoreOffset(consumerResult);, I keep getting the next unconsumed message the next time I Consume, i.e. the offset keeps increasing which doesn't seem to be what the documentation claims it does, i.e. at least one delivery.

Even if I set EnableAutoCommit = false and remove 'EnableAutoOffsetStore = false' from the config, and replace consumer.StoreOffset(consumerResult) with consumer.Commit(), I still see the same behavior, i.e. even if I comment out the Commit, I still keep getting the next unconsumed messages.

I feel like I am missing something fundamental here, but can't figure what. Any help is appreciated!

like image 268
havij Avatar asked Mar 08 '20 14:03

havij


People also ask

Can Kafka message be consumed multiple times?

So the rule in Kafka is only one consumer in a consumer group can be assigned to consume messages from a partition in a topic and hence multiple Kafka consumers from a consumer group can not read the same message from a partition.

What happens to Kafka message once it is consumed?

The Kafka cluster retains all published messages—whether or not they have been consumed—for a configurable period of time. For example if the log retention is set to two days, then for the two days after a message is published it is available for consumption, after which it will be discarded to free up space.

What happens if Kafka message is not committed?

If a consumer fails before a commit, all messages after the last commit are received from Kafka and processed again. However, this retry might result in duplicates, as some message from the last poll() call might have been processed but the failure happened right before the auto commit call.


1 Answers

You may want to have a re-try logic for processing each of your messages for a fixed number of times like say 5. If it doesn't succeed during these 5 retries, you may want to add this message to another topic for handling all failed messages which take precedence over your actual topic. Or you may want to add the failed message to the same topic so that it will be picked up later once all those other messages are consumed.

If the processing of any message is successful within those 5 retries, you can skip to the next message in the queue.

like image 164
Raju Dasupally Avatar answered Sep 30 '22 15:09

Raju Dasupally