Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RabbitMQ Wait for a message with a timeout

Tags:

rabbitmq

I'd like to send a message to a RabbitMQ server and then wait for a reply message (on a "reply-to" queue). Of course, I don't want to wait forever in case the application processing these messages is down - there needs to be a timeout. It sounds like a very basic task, yet I can't find a way to do this. I've now run into this problem with Java API.

like image 702
Sameek Mishra Avatar asked Sep 21 '10 12:09

Sameek Mishra


People also ask

How do I set timeout on RabbitMQ?

The default connection timeout for the RabbitMQ connection factory is 600 seconds (at least in the Java client API), hence your 10 minutes. You can change this by specifying to the connection factory your timeout of choice.

How long do messages stay in RabbitMQ?

In standard queues, messages are retained for at least 72 hours and will be deleted 72 hours later.

Does RabbitMQ delete message after consumed?

you are telling RabbitMQ to automatically acknowledge the message when it is consumed. acknowledging a message tells RabbitMQ that it has been taken care of and RabbitMQ can delete it now.

What does redelivered mean in RabbitMQ?

If a message is delivered to a consumer and then requeued, either automatically by RabbitMQ or by the same or different consumer, RabbitMQ will set the redelivered flag on it when it is delivered again. This is a hint that a consumer may have seen this message before.


2 Answers

The RabbitMQ Java client library now supports a timeout argument to its QueueConsumer.nextDelivery() method.

For instance, the RPC tutorial uses the following code:

channel.basicPublish("", requestQueueName, props, message.getBytes());

while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    if (delivery.getProperties().getCorrelationId().equals(corrId)) {
        response = new String(delivery.getBody());
        break;
    }
}

Now, you can use consumer.nextDelivery(1000) to wait for maximum one second. If the timeout is reached, the method returns null.

channel.basicPublish("", requestQueueName, props, message.getBytes());

while (true) {
    // Use a timeout of 1000 milliseconds
    QueueingConsumer.Delivery delivery = consumer.nextDelivery(1000);

    // Test if delivery is null, meaning the timeout was reached.
    if (delivery != null &&
        delivery.getProperties().getCorrelationId().equals(corrId)) {
        response = new String(delivery.getBody());
        break;
    }
}
like image 61
Jean-Sébastien Pédron Avatar answered Oct 22 '22 18:10

Jean-Sébastien Pédron


com.rabbitmq.client.QueueingConsumer has a nextDelivery(long timeout) method, which will do what you want. However, this has been deprecated. Writing your own timeout isn't so hard, although it may be better to have an ongoing thread and a list of in-time identifiers, rather than adding and removing consumers and associated timeout threads all the time.

Edit to add: Noticed the date on this after replying!

like image 41
Dan Avatar answered Oct 22 '22 16:10

Dan