Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

NATS JetStream messages being processed multiple times by my consumer even when messages are acknowledged

I'm using NATS JetStream to publish and consume messages in a .NET 8 application using NATS.Net. While publishing 10 messages, my consumer processes each message twice, despite calling msg.AckAsync().Interestingly, changing the MaxDeliver setting results in messages being processed more times accordingly. If I don't set MaxDeliver then message get processed only once. Below is my code for both publishing and consuming messages:

using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
using NATS.Client.Serializers.Json;
using System.Buffers;
using System.Text;
using System.Text.Json;


try
{
    int count = 0;
    NatsAuthOpts authOpts = new NatsAuthOpts();
    var natsOpts = NatsOpts.Default with
    {
        //SerializerRegistry = new MyProtoBufSerializerRegistry(),
        SerializerRegistry = NatsJsonSerializerRegistry.Default,
        Url = "nats://localhost:53810",
        AuthOpts = authOpts
    };

    string subject = "test.new.*";
    string streamName = "sample-new";

    await using var nats = new NatsConnection(natsOpts);
    var js = new NatsJSContext(nats);

    await js.CreateStreamAsync(new StreamConfig(name: streamName, subjects: new[] { subject }));
     
    // Publish new order messages
    for (var i = 0; i < 10; i++)
    { 
        var message = new SampleEvent("John.Doe", "Sample Message", DateTime.Now.ToString(), i.ToString());
        var ack = await js.PublishAsync("test.new.first", message);
        ack.EnsureSuccess();
    }
    List<long> backOff = new List<long> { 10000 };

    ConsumerConfig consumerConfig = new ConsumerConfig(name: "first-new-consumer");
    consumerConfig.DeliverPolicy = ConsumerConfigDeliverPolicy.All;
    consumerConfig.MaxDeliver = 2;
    consumerConfig.Backoff = backOff;

    var consumer = await js.CreateOrUpdateConsumerAsync(stream: streamName, consumerConfig);

    await foreach (var msg in consumer.ConsumeAsync<SampleEvent>())
    {
        var order = msg.Data;
        Console.WriteLine($"Processing {msg.Data} {order.ToString()}...");
        await msg.AckAsync();
        Console.WriteLine(++count); 
    }
}
catch (Exception ex)
{
    Console.WriteLine(ex.Message);
}

public class SampleEvent
{
    public SampleEvent()
    {
    }

    public SampleEvent(string userName, string message, string utcTime, string count)
    {
        UserName = userName;
        Message = message;
        UtcTime = utcTime;
        Count = count;
    }

    public string UserName { get; set; }
    public string Message { get; set; }
    public string UtcTime { get; set; }
    public string Count { get; set; }
    public override string ToString()
    {
        return JsonSerializer.Serialize(this);
    }
}

I expect each message to be processed only once by the consumer, regardless of the MaxDeliver setting.If a record is not acknowledged only then that record should get processed again.

like image 428
Kunal Avatar asked Feb 02 '26 07:02

Kunal


1 Answers

Try creating a durable consumer by adding a field called durable and a name for it.

For example:

consumerConfig.Durable = "your_durable_name";
like image 173
Ben Basil Avatar answered Feb 03 '26 21:02

Ben Basil



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!