Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RabbitMQ and C#

Tags:

c#

rabbitmq

With RabbitMQ is there a way to use it similar to MSSMQ where one can pop 1000 messages from the queue, then do your inserts to the database and continue from there.

I cannot seem to do that with a Subscription to a channel and then doing a foreach over the BasicDeliveryEventArgs in the Subscription, with that doing a If statement with the max message count I want to process at the given time.

Thanks in advance This however still takes all 22k messages from the queue

using (IConnection connection = factory.CreateConnection())
{
    using (IModel channel = connection.CreateModel())
    {
        channel.QueueDeclare("****", true, false, false, null);

        var subscription = new Subscription(channel, "****", false);
        int maxMessages = 5;
        int i = 0;
        foreach (BasicDeliverEventArgs eventArgs in subscription)
        {
            if (++i == maxMessages)
            {
                Console.WriteLine("Took 5 messages");
                subscription.Ack(eventArgs);
                break;
            }
        }
    }
}
like image 781
user1053237 Avatar asked Dec 13 '22 07:12

user1053237


1 Answers

I'm assuming that you want to optimise loading of messages into the database by batching up groups of them into larger transactions rather than wearing the cost of a transaction per message. With the obligatory warning that doing so means large groups of messages can fail together, even if only one of them causes a problem, here's how you'd go about it...

Set QOS on the channel:

channel.BasicQos(0, 1000, false);

This will pre-fetch 1000 messages and block further traffic until you ACK something. Note that it doesn't fetch in blocks of 1000. Rather, it ensures that a maximum of 1000 UNACK'ed messages are pre-fetched at any one time. Simulating block transfers is as simple as processing the 1000 messages first, and then ACK'ing them all in one go.

See here and here for a more authoritative explanation than mine.

One more point: You may want to flush the queue as soon as messages are available, even if you haven't made your quota of 1000 messages. You should be able to do this by calling queue.BasicGet() inside the foreach loop until it runs dry, and then delivering whatever you have (including the message you pulled out of subscription) to the database. Caveat: I haven't tried this myself, so I could be talking rubbish, but I think it'll work. The beauty of this method is that it pushes messages into the database immediately, without having to wait for a full batch of 1000 messages. If the database falls behind from handling too many small transactions, the prefetch backlog will simply fill up more between each cycle.

like image 116
Marcelo Cantos Avatar answered Dec 30 '22 19:12

Marcelo Cantos