It is possible to publish messages into a RabbitMQ queue with an expiration TTL: such messages will expire once the TTL is done and (if a dead-letter queue is setup,) removed to the dead-letter queue.
But is it possible to specify such per-message TTL using Celery?
Note that I'm not looking for a way to specify task-expiration but rather message expiration: I want my messages to spend (a configurable) amount of time in the queue before finally getting picked up @ the dead-letter queue.
TIA.
RabbitMQ does support per-message TTL (as well as TTL for the queue), the behavior is documented here: https://www.rabbitmq.com/ttl.html#per-message-ttl-in-publishers. The trick is to set the expiration
Message Property (https://www.rabbitmq.com/publishers.html#message-properties) when the message is published (in milliseconds).
Celery on the other hand allows you to set the expires
parameter (https://docs.celeryproject.org/en/stable/reference/celery.app.task.html) in seconds or as a datetime. The difference from the native RabbitMQ functionality is that the message remains in the queue after expiration. The expired message is delivered to the worker, which then reads the expires header to determine that the message has expired and rejects the message.
tl;dr: expiration != expires
This method is not documented in Celery. I figured it out by trial and error because I wanted a native TTL myself.
The send_task
method (celery.app.base.Celery.send_task
), which is called for example by apply_async
, accepts the **options
parameter. All **options
unknown to Celery are then passed in the celery.app.amqp.Queues->send_task_message( ... )
method as **kwargs
and then as message properties.
So if we can set the message property, there is nothing easier than setting the native expiration:
my_awesome_task.apply_async(args=(11,), expiration=42)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With