Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to set per-message expiration (TTL) in Celery?

It is possible to publish messages into a RabbitMQ queue with an expiration TTL: such messages will expire once the TTL is done and (if a dead-letter queue is setup,) removed to the dead-letter queue.

But is it possible to specify such per-message TTL using Celery?

Note that I'm not looking for a way to specify task-expiration but rather message expiration: I want my messages to spend (a configurable) amount of time in the queue before finally getting picked up @ the dead-letter queue.

TIA.

like image 868
FuzzyAmi Avatar asked Nov 18 '14 09:11

FuzzyAmi


1 Answers

Short introduction: Expiration vs Expires

RabbitMQ does support per-message TTL (as well as TTL for the queue), the behavior is documented here: https://www.rabbitmq.com/ttl.html#per-message-ttl-in-publishers. The trick is to set the expiration Message Property (https://www.rabbitmq.com/publishers.html#message-properties) when the message is published (in milliseconds).

Celery on the other hand allows you to set the expires parameter (https://docs.celeryproject.org/en/stable/reference/celery.app.task.html) in seconds or as a datetime. The difference from the native RabbitMQ functionality is that the message remains in the queue after expiration. The expired message is delivered to the worker, which then reads the expires header to determine that the message has expired and rejects the message.

tl;dr: expiration != expires

How to pass a message property in Celery

This method is not documented in Celery. I figured it out by trial and error because I wanted a native TTL myself.

The send_task method (celery.app.base.Celery.send_task), which is called for example by apply_async, accepts the **options parameter. All **options unknown to Celery are then passed in the celery.app.amqp.Queues->send_task_message( ... ) method as **kwargs and then as message properties.

So if we can set the message property, there is nothing easier than setting the native expiration:

my_awesome_task.apply_async(args=(11,), expiration=42)

RabbitMQ Management console

  • Note that Celery automatically converts 42 seconds to 42000 milliseconds (which is correct).
  • Expiration (in properties) and Expires (in headers) can be combined, the two functionalities are not affected in any way.
like image 59
illagrenan Avatar answered Sep 18 '22 00:09

illagrenan