Solutions in RabbitMQ Wait for a message with a timeout and Wait for a single RabbitMQ message with a timeout don't seem to work because there is no next delivery method in official C# library and QueueingBasicConsumer is depricated, so it just throws NotSupportedException everywhere.
How I can wait for single message from queue for specified timeout?
PS
It can be done through Basic.Get(), yes, but well, it is bad solution to pull messages in specififed interval (excess traffic, excess CPU).
Update
EventingBasicConsumer by implmenetation NOT SUPPORT immediate cancelation. Even if you call BasicCancel at some point, even if you specify prefetch through BasicQos - it will still fetch in Frames and those frames can contain multiple messages. So, it is not good for single task execution. Don't bother - it just don't work with single messages.
There are many ways to do this. For example you can use EventingBasicConsumer
together with ManualResetEvent
, like this (that's just for demonstration purposes - better use one of the methods below):
var factory = new ConnectionFactory();
using (var connection = factory.CreateConnection()) {
using (var channel = connection.CreateModel()) {
// setup signal
using (var signal = new ManualResetEvent(false)) {
var consumer = new EventingBasicConsumer(channel);
byte[] messageBody = null;
consumer.Received += (sender, args) => {
messageBody = args.Body;
// process your message or store for later
// set signal
signal.Set();
};
// start consuming
channel.BasicConsume("your.queue", false, consumer);
// wait until message is received or timeout reached
bool timeout = !signal.WaitOne(TimeSpan.FromSeconds(10));
// cancel subscription
channel.BasicCancel(consumer.ConsumerTag);
if (timeout) {
// timeout reached - do what you need in this case
throw new Exception("timeout");
}
// at this point messageBody is received
}
}
}
As you stated in comments - if you expect multiple messages on the same queue, it's not the best way. Well it's not the best way in any case, I included it just to demonstrate the use of ManualResetEvent
in case library itself does not provide timeouts support.
If you are doing RPC (remote procedure call, request-reply) - you can use SimpleRpcClient
together with SimpleRpcServer
on server side. Client side will look like this:
var client = new SimpleRpcClient(channel, "your.queue");
client.TimeoutMilliseconds = 10 * 1000;
client.TimedOut += (sender, args) => {
// do something on timeout
};
var reply = client.Call(myMessage); // will return reply or null if timeout reached
Even more simple way: use basic Subscription
class (it uses the same EventingBasicConsumer
internally, but supports timeouts so you don't need to implement yourself), like this:
var sub = new Subscription(channel, "your.queue");
BasicDeliverEventArgs reply;
if (!sub.Next(10 * 1000, out reply)) {
// timeout
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With