Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to delay? - php-amqplib

I would like to know how to delay with Amqpphplib.

I used this great coffee script tutorial :

https://github.com/jamescarr/rabbitmq-scheduled-delivery

but it doesn't seems to work with PHP-amqplib.

The message expires as I want, but it seems that "x-dead-letter-exchange" don't do the work. I used RabbitMQ management console and I see all queue creation and deletion in live. But my message do go to the immediate queue after expiring. I use RabbitMQ 3.2.3 version, PHP-amqplib 2.2.* version.

Here is my code :

Connection class :

class Connection
{
/**
 * @var $ch
 */
public $ch;

/**
 * @var $consumer_tag
 */
public $consumer_tag;

/**
 * @var $exchange
 */
public $exchange;

/**
 * @var $conn
 */
public $conn;

public function __construct($host, $port, $user, $password, $vhost)
{

    $this->exchange = 'immediate';
    $this->queue = 'right.now.queue';
    $this->consumer_tag = 'consumer';


    $this->conn = new AMQPConnection($host, $port, $user, $password, $vhost);
    $this->ch = $this->conn->channel();

    $this->ch->exchange_declare($this->exchange, 'direct', false, true, false);

    $this->ch->queue_declare($this->queue, false, true, false, false, false);

    $this->ch->queue_bind($this->queue, $this->exchange);


}

public function createDelayedQueue ($name, $delay_seconds) {
    $this->ch->queue_declare($name, false, false, false, true, true, array(
        "x-dead-letter-exchange" => array("S", $this->exchange),
        "x-message-ttl" => array("I", $delay_seconds*1000),
        "x-expires" => array("I", $delay_seconds*1000+1000)
    ));
}
}

Publish code

$name = 'send.later.'.$ts;
$amqp->createDelayedQueue($name, 2);
$msg = new AMQPMessage($msg_body, array('content_type' => 'text/plain', 'delivery_mode' => 2));
$amqp->ch->basic_publish($msg);

Consumer code

$amqp = $this->getContainer()->get('amqp_connexion');

    $amqp->ch->basic_consume($amqp->queue, $amqp->consumer_tag, false, false, false, false, function ($msg) {

        echo $msg->body;
        echo "\n--------\n";
    });

    $output->writeln('Listening '.$amqp->queue.'...');

    // Loop as long as the channel has callbacks registered
    while (count($amqp->ch->callbacks)) {
        $amqp->ch->wait();
    }
like image 691
LucasC Avatar asked Feb 21 '14 18:02

LucasC


2 Answers

I just wrote a simplified working version for php:

/////// simplified ///////

// include the AMQPlib Classes || use an autoloader

// queue/exchange names
$queueRightNow = 'right.now.queue';
$exchangeRightNow = 'right.now.exchange';
$queueDelayed5sec = 'delayed.five.seconds.queue';
$exchangeDelayed5sec = 'delayed.five.seconds.exchange';

$delay = 5; // delay in seconds

// create connection
$AMQPConnection = new \PhpAmqpLib\Connection\AMQPConnection('localhost',5672,'guest','guest');

// create a channel
$channel = $AMQPConnection->channel();

// create the right.now.queue, the exchange for that queue and bind them together
$channel->queue_declare($queueRightNow);
$channel->exchange_declare($exchangeRightNow, 'direct');
$channel->queue_bind($queueRightNow, $exchangeRightNow);

// now create the delayed queue and the exchange
$channel->queue_declare(
        $queueDelayed5sec,
        false,
        false,
        false,
        true,
        true,
        array(
            'x-message-ttl' => array('I', $delay*1000),   // delay in seconds to milliseconds
            "x-expires" => array("I", $delay*1000+1000),
            'x-dead-letter-exchange' => array('S', $exchangeRightNow) // after message expiration in delay queue, move message to the right.now.queue
        )
);
$channel->exchange_declare($exchangeDelayed5sec, 'direct');
$channel->queue_bind($queueDelayed5sec, $exchangeDelayed5sec);

// now create a message und publish it to the delayed exchange
$msg = new \PhpAmqpLib\Message\AMQPMessage(
    time(),
    array(
        'delivery_mode' => 2
    )
);
$channel->basic_publish($msg,$exchangeDelayed5sec);


// consume the delayed message
$consumeCallback = function(\PhpAmqpLib\Message\AMQPMessage $msg) {
    $messagePublishedAt = $msg->body;
    echo 'seconds between publishing and consuming: '
        . (time()-$messagePublishedAt) . PHP_EOL;
};
$channel->basic_consume($queueRightNow, '', false, true, false, false, $consumeCallback);

// start consuming
while (count($channel->callbacks) > 0) {
    $channel->wait();
}
like image 160
byMike Avatar answered Oct 04 '22 06:10

byMike


If you choose amqp interop based transport you won't need to dig into details at all. Only a few things to do:

Install enqueue/amqp-lib (btw you can use other transports based on amqp ext and a great bunny lib) transport and enqueue/amqp-tools.

composer require enqueue/amqp-lib enqueue/amqp-tools

Create amqp context, add a delay strategy and send delayed messages:

<?php
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
use Enqueue\AmqpBunny\AmqpConnectionFactory;

$context = (new AmqpConnectionFactory('amqp://'))->createContext();
$context->setDelayStrategy(new RabbitMqDlxDelayStrategy())

$queue = $context->createQueue('foo');
$context->declareQueue($queue);

$message = $context->createMessage('Hello world!');

$context->createProducer()
    ->setDeliveryDelay(5000) // 5 sec
    ->send($queue, $message)
;

By the way, this not this only strategy available. there is one based on RabbitMQ delay plugin. It could be used the same way.

like image 23
Maksim Kotlyar Avatar answered Oct 04 '22 06:10

Maksim Kotlyar