Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rabbitmq retrieve multiple messages using single synchronous call using .NET

Is there a way to receive multiple message using a single synchronous call using .NET?
I've seen question and I've found java class com.rabbitmq.client.QueueingConsumer, but I haven't found such client class in .NET namespaces (RabbitMQ.Client, RabbitMQ.Client.Events)

like image 299
zampotex Avatar asked Aug 31 '15 10:08

zampotex


People also ask

How do you consume multiple messages in RabbitMQ?

To increase the performance and to consume more messages at a time, do as follows: Open the "RabbitMQ connection" and go to the Event sources tab. In Advanced Settings > "Other Attributes:", add “concurrentConsumers” property. For instance: concurrentConsumers=10.

Is it possible that multiple consumers of a RabbitMQ queue get the same message?

RabbitMQ has a plugin for consistent hash exchange. Using that exchange, and one consumer per queue, we can achieve message order with multiple consumers. The hash exchange distributes routing keys among queues, instead of messages among queues. This means all messages with the same routing key will go the same queue.

How do I get all messages from RabbitMQ?

exports = (connection, queue) => { init(connection, queue); return { getMessages: (queueName, cleanQueue) => new Promise((resolve) => { let messages = []; let i = 1; getChannel(). then((ch) => { ch. consume(queueName, (msg) => { messages. push(msg); console.

Can you have multiple subscriptions on a single message RabbitMQ queue?

It is not possible. A particular message from a queue cannot be consumed by more than one consumer. Remember in AMQPAMQPThe Advanced Message Queuing Protocol (AMQP) is an open standard application layer protocol for message-oriented middleware. The defining features of AMQP are message orientation, queuing, routing (including point-to-point and publish-and-subscribe), reliability and security.https://en.wikipedia.org › wiki › Advanced_Message_Queuing...Advanced Message Queuing Protocol - Wikipedia, messages are always consumed from queue. The producer publishes a message to an exchange.


2 Answers

You can retrieve as many messages as you want using the BasicQoS.PrefetchCount:

var model = _rabbitConnection.CreateModel();
// Configure the Quality of service for the model. Below is how what each setting means.
// BasicQos(0="Dont send me a new message untill I’ve finshed",  _fetchSize = "Send me N messages at a time", false ="Apply to this Model only")
model.BasicQos(0, fetchSize, false);

Note: if you set fetchSize = 20 then it will retrieve the first 20 messages that are currently in the queue. But, once the queue has been emptied it won't wait for 20 messages to build up in the queue, it will start consuming them as fast as possible, grabbing up to 20 at a time.

Hopefully that makes sense.

like image 65
jhilden Avatar answered Sep 28 '22 22:09

jhilden


Thanks for answers, but I've found the class which I looked for: RabbitMQ.Client.QueueingBasicConsumer
Simple implementation is:

IEnumerable<T> Get(int maxBatchCount, int getMessageTimeout, int getBatchTimeout)
{
    var result = new List<T>();
    var startTime = DateTime.Now;
    while (result.Count < maxBatchCount)
    {
        var deliverEventArgs = new BasicDeliverEventArgs();
        if ((_consumer as QueueingBasicConsumer).Queue.Dequeue(GetMessageTimeout, out deliverEventArgs))
        {
            var entry = ContractSerializer.Deserialize<T>(deliverEventArgs.Body);
            result.Add(entry);
            _queue.Channel.BasicAck(deliverEventArgs.DeliveryTag, false);
        }
        else
            break;

        if ((DateTime.Now - startTime) >= TimeSpan.FromMilliseconds(getBatchTimeout))
            break;
    }
    return result;
}

Well, of course, you can use Environment.TickCount instead of DateTime.Now

like image 30
zampotex Avatar answered Sep 28 '22 21:09

zampotex