Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

C# RabbitMQ wait for one message for specified timeout?

Tags:

c#

rabbitmq

Solutions in RabbitMQ Wait for a message with a timeout and Wait for a single RabbitMQ message with a timeout don't seem to work because there is no next delivery method in official C# library and QueueingBasicConsumer is depricated, so it just throws NotSupportedException everywhere.

How I can wait for single message from queue for specified timeout?

PS

It can be done through Basic.Get(), yes, but well, it is bad solution to pull messages in specififed interval (excess traffic, excess CPU).

Update

EventingBasicConsumer by implmenetation NOT SUPPORT immediate cancelation. Even if you call BasicCancel at some point, even if you specify prefetch through BasicQos - it will still fetch in Frames and those frames can contain multiple messages. So, it is not good for single task execution. Don't bother - it just don't work with single messages.

like image 712
eocron Avatar asked May 22 '17 10:05

eocron


1 Answers

There are many ways to do this. For example you can use EventingBasicConsumer together with ManualResetEvent, like this (that's just for demonstration purposes - better use one of the methods below):

var factory = new ConnectionFactory();
using (var connection = factory.CreateConnection()) {
    using (var channel = connection.CreateModel()) {
        // setup signal
        using (var signal = new ManualResetEvent(false)) {
            var consumer = new EventingBasicConsumer(channel);
            byte[] messageBody = null;                        
            consumer.Received += (sender, args) => {
                messageBody = args.Body;
                // process your message or store for later
                // set signal
                signal.Set();
            };               
            // start consuming
            channel.BasicConsume("your.queue", false, consumer);
            // wait until message is received or timeout reached
            bool timeout = !signal.WaitOne(TimeSpan.FromSeconds(10));
            // cancel subscription
            channel.BasicCancel(consumer.ConsumerTag);
            if (timeout) {
                // timeout reached - do what you need in this case
                throw new Exception("timeout");
            }

            // at this point messageBody is received
        }
    }
}

As you stated in comments - if you expect multiple messages on the same queue, it's not the best way. Well it's not the best way in any case, I included it just to demonstrate the use of ManualResetEvent in case library itself does not provide timeouts support.

If you are doing RPC (remote procedure call, request-reply) - you can use SimpleRpcClient together with SimpleRpcServer on server side. Client side will look like this:

var client = new SimpleRpcClient(channel, "your.queue");
client.TimeoutMilliseconds = 10 * 1000;
client.TimedOut += (sender, args) => {
    // do something on timeout
};                    
var reply = client.Call(myMessage); // will return reply or null if timeout reached

Even more simple way: use basic Subscription class (it uses the same EventingBasicConsumer internally, but supports timeouts so you don't need to implement yourself), like this:

var sub = new Subscription(channel, "your.queue");
BasicDeliverEventArgs reply;
if (!sub.Next(10 * 1000, out reply)) {
     // timeout
}
like image 146
Evk Avatar answered Nov 01 '22 11:11

Evk