Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RabbitMQ - Non Blocking Consumer with Manual Acknowledgement

Tags:

rabbitmq

I'm just starting to learn RabbitMQ so forgive me if my question is very basic.

My problem is actually the same with the one posted here: RabbitMQ - Does one consumer block the other consumers of the same queue?

However, upon investigation, i found out that manual acknowledgement prevents other consumers from getting a message from the queue - blocking state. I would like to know how can I prevent it. Below is my code snippet.

...

var message = receiver.ReadMessage();
Console.WriteLine("Received: {0}", message);

// simulate processing
System.Threading.Thread.Sleep(8000);

receiver.Acknowledge();

public string ReadMessage()
{

   bool autoAck = false;
   Consumer = new QueueingBasicConsumer(Model);
   Model.BasicConsume(QueueName, autoAck, Consumer);
   _ea = (BasicDeliverEventArgs)Consumer.Queue.Dequeue();
   return Encoding.ASCII.GetString(_ea.Body);
}

public void Acknowledge()
{

   Model.BasicAck(_ea.DeliveryTag, false);
}
like image 730
Ranger Avatar asked Jan 02 '26 08:01

Ranger


1 Answers

I modify how I get messages from the queue and it seems blocking issue was fixed. Below is my code.

    public string ReadOneAtTime()
    {
        Consumer = new QueueingBasicConsumer(Model);
        var result = Model.BasicGet(QueueName, false);
        if (result == null) return null;
        DeliveryTag = result.DeliveryTag;
        return Encoding.ASCII.GetString(result.Body);
    }

    public void Reject()
    {
        Model.BasicReject(DeliveryTag, true);
    }

    public void Acknowledge()
    {
        Model.BasicAck(DeliveryTag, false);
    }

Going back to my original question, I added the QOS and noticed that other consumers can now get messages. However some are left unacknowledged and my program seems to hangup. Code changes are below:

    public string ReadMessage()
    {
        Model.BasicQos(0, 1, false); // control prefetch
        bool autoAck = false;
        Consumer = new QueueingBasicConsumer(Model);
        Model.BasicConsume(QueueName, autoAck, Consumer);

        _ea = Consumer.Queue.Dequeue();
        return Encoding.ASCII.GetString(_ea.Body);
    }

    public void AckConsume()
    {
        Model.BasicAck(_ea.DeliveryTag, false);
    }

    In Program.cs
    private static void Consume(Receiver receiver)
    {
        int counter = 0;
        while (true)
        {
            var message = receiver.ReadMessage();
            if (message == null)
            {
                Console.WriteLine("NO message received.");
                break;
            }
            else
            {
                counter++;
                Console.WriteLine("Received: {0}", message);
                receiver.AckConsume();
            }
        }

        Console.WriteLine("Total message received {0}", counter);
    }

I appreciate any comments and suggestions. Thanks!

like image 176
Ranger Avatar answered Jan 06 '26 12:01

Ranger



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!