Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I use the RabbitMQ delayed message queue from PHP?

I'm trying to use the Delayed Message Queue for RabbitMQ from PHP, but my messages are simply disappearing.

I'm declaring the exchange with the following code:

$this->channel->exchange_declare(
    'delay',
    'x-delayed-message',
    false,  /* passive, create if exchange doesn't exist */
    true,   /* durable, persist through server reboots */
    false,  /* autodelete */
    false,  /* internal */
    false,  /* nowait */
    ['x-delayed-type' => ['S', 'direct']]);

I'm binding the queue with this code:

$this->channel->queue_declare(
    $queueName,
    false,  /* Passive */
    true,   /* Durable */
    false,  /* Exclusive */
    false   /* Auto Delete */
);
$this->channel->queue_bind($queueName, "delay", $queueName);

And I'm publishing a message with this code:

$msg = new AMQPMessage(json_encode($msgData), [
    'delivery_mode' => 2,
    'x-delay' => 5000]);
$this->channel->basic_publish($msg, 'delay', $queueName);

But the message doesn't get delayed; it's still immediately delivered. What am I missing?

like image 319
Jesse Weigert Avatar asked Aug 31 '15 17:08

Jesse Weigert


People also ask

How to delay messages in RabbitMQ?

To delay a message, the user must publish it with the x-delay header, which accepts an integer representing the number of milliseconds the message should be delayed by RabbitMQ. It's worth noting that delay in this context means delaying message routing to queues or other exchanges.

What is delayed queue in RabbitMQ?

The RabbitMQ delayed exchange plugin is used to implement a wait time between when a message reaches the exchange and when it is delivered to a queue. Every time a message is published, an offset in milliseconds can be specified.

Is RabbitMQ a message queue?

RabbitMQ is an open-source message broker software written in Erlang. It is commonly called message-oriented middleware that implements the AMQP (Advanced Message Queuing Protocol).


2 Answers

From here,

The message creation should be

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$msg = new AMQPMessage($data,
            array(
                'delivery_mode' => 2, # make message persistent
                'application_headers' => new AMQPTable([
                    'x-delay' => 5000
                ])
            )
        );
like image 200
Toosick Avatar answered Nov 03 '22 22:11

Toosick


The answer is for those who need message delaying but does not want to dig into details. You need only a few things to get it working:

Install amqp interop compatible transport for example enqueue/amqp-bunny and enqueue/amqp-tools.

composer require enqueue/amqp-bunny enqueue/amqp-tools

Create amqp context, add a delay strategy and send delayed messages:

<?php
use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy;
use Enqueue\AmqpBunny\AmqpConnectionFactory;

$context = (new AmqpConnectionFactory('amqp://'))->createContext();
$context->setDelayStrategy(new RabbitMqDelayPluginDelayStrategy())

$queue = $context->createQueue('foo');
$context->declareQueue($queue);

$message = $context->createMessage('Hello world!');

$context->createProducer()
    ->setDeliveryDelay(5000) // 5 sec
    ->send($queue, $message)
;

By the way, this not the only strategy available. there is one based on RabbitMQ dead letter queues + ttl. It could be used the same way.

like image 20
Maksim Kotlyar Avatar answered Nov 03 '22 21:11

Maksim Kotlyar