With RabbitMQ is there a way to use it similar to MSSMQ where one can pop 1000 messages from the queue, then do your inserts to the database and continue from there.
I cannot seem to do that with a Subscription to a channel and then doing a foreach over the BasicDeliveryEventArgs in the Subscription, with that doing a If statement with the max message count I want to process at the given time.
Thanks in advance This however still takes all 22k messages from the queue
using (IConnection connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.QueueDeclare("****", true, false, false, null);
var subscription = new Subscription(channel, "****", false);
int maxMessages = 5;
int i = 0;
foreach (BasicDeliverEventArgs eventArgs in subscription)
{
if (++i == maxMessages)
{
Console.WriteLine("Took 5 messages");
subscription.Ack(eventArgs);
break;
}
}
}
}
I'm assuming that you want to optimise loading of messages into the database by batching up groups of them into larger transactions rather than wearing the cost of a transaction per message. With the obligatory warning that doing so means large groups of messages can fail together, even if only one of them causes a problem, here's how you'd go about it...
Set QOS on the channel:
channel.BasicQos(0, 1000, false);
This will pre-fetch 1000 messages and block further traffic until you ACK something. Note that it doesn't fetch in blocks of 1000. Rather, it ensures that a maximum of 1000 UNACK'ed messages are pre-fetched at any one time. Simulating block transfers is as simple as processing the 1000 messages first, and then ACK'ing them all in one go.
See here and here for a more authoritative explanation than mine.
One more point: You may want to flush the queue as soon as messages are available, even if you haven't made your quota of 1000 messages. You should be able to do this by calling queue.BasicGet()
inside the foreach
loop until it runs dry, and then delivering whatever you have (including the message you pulled out of subscription
) to the database. Caveat: I haven't tried this myself, so I could be talking rubbish, but I think it'll work. The beauty of this method is that it pushes messages into the database immediately, without having to wait for a full batch of 1000 messages. If the database falls behind from handling too many small transactions, the prefetch backlog will simply fill up more between each cycle.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With