Is it possible to send message via RabbitMQ with some delay? For example I want to expire client session after 30 minutes, and I send a message which will be processed after 30 minutes.
Queues in RabbitMQ are ordered collections of messages. Messages are enqueued and dequeued (delivered to consumers) in the FIFO manner. FIFO ordering is not guaranteed for priority and sharded queues.
From RabbitMQ release 2.7. 0, messages are always held in the queue in publication order, even in the presence of requeueing or channel closure. With release 2.7. 0 and later it is still possible for individual consumers to observe messages out of order if the queue has multiple subscribers.
In order to avoid losing messages on the RabbitMQ (as opposed to application) side, queues and messages must be able to cope with RabbitMQ node restarts, node and hardware failures. With some messaging protocols supported by RabbitMQ, applications control durability of queues and messages.
Applications can subscribe to have RabbitMQ push enqueued messages (deliveries) to them. This is done by registering a consumer (subscription) on a queue. After a subscription is in place, RabbitMQ will begin delivering messages. For each delivery a user-provided handler will be invoked.
There are two approaches you can try:
Old Approach: Set the TTL(time to live) header in each message/queue(policy) and then introduce a DLQ to handle it. once the ttl expired your messages will move from DLQ to main queue so that your listener can process it.
Latest Approach: Recently RabbitMQ came up with RabbitMQ Delayed Message Plugin , using which you can achieve the same and this plugin support available since RabbitMQ-3.5.8.
You can declare an exchange with the type x-delayed-message and then publish messages with the custom header x-delay expressing in milliseconds a delay time for the message. The message will be delivered to the respective queues after x-delay milliseconds
byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8"); Map<String, Object> headers = new HashMap<String, Object>(); headers.put("x-delay", 5000); AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers); channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);
More here: git
With the release of RabbitMQ v2.8, scheduled delivery is now available but as an indirect feature: http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With