I would like to know how to delay with Amqpphplib.
I used this great coffee script tutorial :
https://github.com/jamescarr/rabbitmq-scheduled-delivery
but it doesn't seems to work with PHP-amqplib.
The message expires as I want, but it seems that "x-dead-letter-exchange" don't do the work. I used RabbitMQ management console and I see all queue creation and deletion in live. But my message do go to the immediate queue after expiring. I use RabbitMQ 3.2.3 version, PHP-amqplib 2.2.* version.
Here is my code :
Connection class :
class Connection
{
/**
* @var $ch
*/
public $ch;
/**
* @var $consumer_tag
*/
public $consumer_tag;
/**
* @var $exchange
*/
public $exchange;
/**
* @var $conn
*/
public $conn;
public function __construct($host, $port, $user, $password, $vhost)
{
$this->exchange = 'immediate';
$this->queue = 'right.now.queue';
$this->consumer_tag = 'consumer';
$this->conn = new AMQPConnection($host, $port, $user, $password, $vhost);
$this->ch = $this->conn->channel();
$this->ch->exchange_declare($this->exchange, 'direct', false, true, false);
$this->ch->queue_declare($this->queue, false, true, false, false, false);
$this->ch->queue_bind($this->queue, $this->exchange);
}
public function createDelayedQueue ($name, $delay_seconds) {
$this->ch->queue_declare($name, false, false, false, true, true, array(
"x-dead-letter-exchange" => array("S", $this->exchange),
"x-message-ttl" => array("I", $delay_seconds*1000),
"x-expires" => array("I", $delay_seconds*1000+1000)
));
}
}
Publish code
$name = 'send.later.'.$ts;
$amqp->createDelayedQueue($name, 2);
$msg = new AMQPMessage($msg_body, array('content_type' => 'text/plain', 'delivery_mode' => 2));
$amqp->ch->basic_publish($msg);
Consumer code
$amqp = $this->getContainer()->get('amqp_connexion');
$amqp->ch->basic_consume($amqp->queue, $amqp->consumer_tag, false, false, false, false, function ($msg) {
echo $msg->body;
echo "\n--------\n";
});
$output->writeln('Listening '.$amqp->queue.'...');
// Loop as long as the channel has callbacks registered
while (count($amqp->ch->callbacks)) {
$amqp->ch->wait();
}
I just wrote a simplified working version for php:
/////// simplified ///////
// include the AMQPlib Classes || use an autoloader
// queue/exchange names
$queueRightNow = 'right.now.queue';
$exchangeRightNow = 'right.now.exchange';
$queueDelayed5sec = 'delayed.five.seconds.queue';
$exchangeDelayed5sec = 'delayed.five.seconds.exchange';
$delay = 5; // delay in seconds
// create connection
$AMQPConnection = new \PhpAmqpLib\Connection\AMQPConnection('localhost',5672,'guest','guest');
// create a channel
$channel = $AMQPConnection->channel();
// create the right.now.queue, the exchange for that queue and bind them together
$channel->queue_declare($queueRightNow);
$channel->exchange_declare($exchangeRightNow, 'direct');
$channel->queue_bind($queueRightNow, $exchangeRightNow);
// now create the delayed queue and the exchange
$channel->queue_declare(
$queueDelayed5sec,
false,
false,
false,
true,
true,
array(
'x-message-ttl' => array('I', $delay*1000), // delay in seconds to milliseconds
"x-expires" => array("I", $delay*1000+1000),
'x-dead-letter-exchange' => array('S', $exchangeRightNow) // after message expiration in delay queue, move message to the right.now.queue
)
);
$channel->exchange_declare($exchangeDelayed5sec, 'direct');
$channel->queue_bind($queueDelayed5sec, $exchangeDelayed5sec);
// now create a message und publish it to the delayed exchange
$msg = new \PhpAmqpLib\Message\AMQPMessage(
time(),
array(
'delivery_mode' => 2
)
);
$channel->basic_publish($msg,$exchangeDelayed5sec);
// consume the delayed message
$consumeCallback = function(\PhpAmqpLib\Message\AMQPMessage $msg) {
$messagePublishedAt = $msg->body;
echo 'seconds between publishing and consuming: '
. (time()-$messagePublishedAt) . PHP_EOL;
};
$channel->basic_consume($queueRightNow, '', false, true, false, false, $consumeCallback);
// start consuming
while (count($channel->callbacks) > 0) {
$channel->wait();
}
If you choose amqp interop based transport you won't need to dig into details at all. Only a few things to do:
Install enqueue/amqp-lib
(btw you can use other transports based on amqp ext and a great bunny lib) transport and enqueue/amqp-tools
.
composer require enqueue/amqp-lib enqueue/amqp-tools
Create amqp context, add a delay strategy and send delayed messages:
<?php
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
use Enqueue\AmqpBunny\AmqpConnectionFactory;
$context = (new AmqpConnectionFactory('amqp://'))->createContext();
$context->setDelayStrategy(new RabbitMqDlxDelayStrategy())
$queue = $context->createQueue('foo');
$context->declareQueue($queue);
$message = $context->createMessage('Hello world!');
$context->createProducer()
->setDeliveryDelay(5000) // 5 sec
->send($queue, $message)
;
By the way, this not this only strategy available. there is one based on RabbitMQ delay plugin. It could be used the same way.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With