I have create a simple publisher and a consumer which subscribes on the queue using basic.consume
.
My consumer acknowledges the messages when the job runs without an exception. Whenever I run into an exception I don´t ack the message and return early. Only the acknowledged messages disappear from the queue, so that´s working correctly.
Now I want the consumer to pick up the failed messages again, but the only way to reconsume those messages is by restarting the consumer.
How do I need to approach this use case?
Setup code
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName('my-exchange');
$exchange->setType('fanout');
$exchange->declare();
$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declare();
$queue->bind('my-exchange');
Consumer code
$queue->consume(array($this, 'callback'));
public function callback(AMQPEnvelope $msg)
{
try {
//Do some business logic
} catch (Exception $ex) {
//Log exception
return;
}
return $queue->ack($msg->getDeliveryTag());
}
Producer code
$exchange->publish('message');
If a consumer fails to acknowledge messages, the RabbitMQ will keep sending new messages until the prefetch value set for the associated channel is equal to the number of RabbitMQ Unacked Messages count.
In order to consume messages there has to be a queue. When a new consumer is added, assuming there are already messages ready in the queue, deliveries will start immediately. The target queue can be empty at the time of consumer registration. In that case first deliveries will happen when new messages are enqueued.
you are telling RabbitMQ to automatically acknowledge the message when it is consumed. acknowledging a message tells RabbitMQ that it has been taken care of and RabbitMQ can delete it now. set autoAck to false if you want to manually acknowledge the message after you are done processing it.
The easy way to do that is to use autoack=true , so the message acknowledged automatically once it is consumed.
If message was not acknowledged and application fails, it will be redelivered automatically and redelivered
property on envelope will be set to true
(unless you consume them with no-ack = true
flag).
UPD:
You have to nack
message with redelivery flag in your catch block
try {
//Do some business logic
} catch (Exception $ex) {
//Log exception
return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE);
}
Beware infinitely nacked messages while redelivery count doesn't implemented in RabbitMQ and in AMQP protocol at all.
If you doesn't want to mess with such messages and simply want to add some delay you may want to add some sleep()
or usleep()
before nack
method call, but it is not a good idea at all.
There are multiple techniques to deal with cycle redeliver problem:
1. Rely on Dead Letter Exchanges
2. Use per message or per queue TTL
Examples (note, that for queue ttl we pass only number and for message ttl - anything that will be numeric string):
2.1 Per message ttl:
$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');
$exchange->publish(
'message at ' . microtime(true),
null,
AMQP_NOPARAM,
array(
'expiration' => '1000'
)
);
2.2. Per queue ttl:
$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->setArgument('x-message-ttl', 1000);
$queue->declareQueue();
$queue->bind('my-exchange');
$exchange->publish('message at ' . microtime(true));
3. Hold redelivers count or left redelivers number (aka hop limit or ttl in IP stack) in message body or headers
Code:
$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');
$exchange->publish(
'message at ' . microtime(true),
null,
AMQP_NOPARAM,
array(
'headers' => array(
'ttl' => 100
)
)
);
$queue->consume(
function (AMQPEnvelope $msg, AMQPQueue $queue) use ($exchange) {
$headers = $msg->getHeaders();
echo $msg->isRedelivery() ? 'redelivered' : 'origin', ' ';
echo $msg->getDeliveryTag(), ' ';
echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl' , ' ';
echo $msg->getBody(), PHP_EOL;
try {
//Do some business logic
throw new Exception('business logic failed');
} catch (Exception $ex) {
//Log exception
if (isset($headers['ttl'])) {
// with ttl logic
if ($headers['ttl'] > 0) {
$headers['ttl']--;
$exchange->publish($msg->getBody(), $msg->getRoutingKey(), AMQP_NOPARAM, array('headers' => $headers));
}
return $queue->ack($msg->getDeliveryTag());
} else {
// without ttl logic
return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); // or drop it without requeue
}
}
return $queue->ack($msg->getDeliveryTag());
}
);
There are may be some other ways to better control message redelivers flow.
Conclusion: there are no silver bullet solution. You have to decide what solution fit your need the best or find out something other, but don't forget to share it here ;)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With