I'm trying to use the Delayed Message Queue for RabbitMQ from PHP, but my messages are simply disappearing.
I'm declaring the exchange with the following code:
$this->channel->exchange_declare(
'delay',
'x-delayed-message',
false, /* passive, create if exchange doesn't exist */
true, /* durable, persist through server reboots */
false, /* autodelete */
false, /* internal */
false, /* nowait */
['x-delayed-type' => ['S', 'direct']]);
I'm binding the queue with this code:
$this->channel->queue_declare(
$queueName,
false, /* Passive */
true, /* Durable */
false, /* Exclusive */
false /* Auto Delete */
);
$this->channel->queue_bind($queueName, "delay", $queueName);
And I'm publishing a message with this code:
$msg = new AMQPMessage(json_encode($msgData), [
'delivery_mode' => 2,
'x-delay' => 5000]);
$this->channel->basic_publish($msg, 'delay', $queueName);
But the message doesn't get delayed; it's still immediately delivered. What am I missing?
To delay a message, the user must publish it with the x-delay header, which accepts an integer representing the number of milliseconds the message should be delayed by RabbitMQ. It's worth noting that delay in this context means delaying message routing to queues or other exchanges.
The RabbitMQ delayed exchange plugin is used to implement a wait time between when a message reaches the exchange and when it is delivered to a queue. Every time a message is published, an offset in milliseconds can be specified.
RabbitMQ is an open-source message broker software written in Erlang. It is commonly called message-oriented middleware that implements the AMQP (Advanced Message Queuing Protocol).
From here,
The message creation should be
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$msg = new AMQPMessage($data,
array(
'delivery_mode' => 2, # make message persistent
'application_headers' => new AMQPTable([
'x-delay' => 5000
])
)
);
The answer is for those who need message delaying but does not want to dig into details. You need only a few things to get it working:
Install amqp interop compatible transport for example enqueue/amqp-bunny
and enqueue/amqp-tools
.
composer require enqueue/amqp-bunny enqueue/amqp-tools
Create amqp context, add a delay strategy and send delayed messages:
<?php
use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy;
use Enqueue\AmqpBunny\AmqpConnectionFactory;
$context = (new AmqpConnectionFactory('amqp://'))->createContext();
$context->setDelayStrategy(new RabbitMqDelayPluginDelayStrategy())
$queue = $context->createQueue('foo');
$context->declareQueue($queue);
$message = $context->createMessage('Hello world!');
$context->createProducer()
->setDeliveryDelay(5000) // 5 sec
->send($queue, $message)
;
By the way, this not the only strategy available. there is one based on RabbitMQ dead letter queues + ttl. It could be used the same way.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With