I'm a newcomer to celery and I try to integrate this task queue into my project but I still don't figure out how celery handles the failed tasks and I'd like to keep all those in a amqp dead-letter queue.
According to the doc here it seems that raising Reject in a Task having acks_late enabled produces the same effect as acking the message and then we have a few words about dead-letter queues.
So I added a custom default queue to my celery config
celery_app.conf.update(CELERY_ACCEPT_CONTENT=['application/json'],
CELERY_TASK_SERIALIZER='json',
CELERY_QUEUES=[CELERY_QUEUE,
CELERY_DLX_QUEUE],
CELERY_DEFAULT_QUEUE=CELERY_QUEUE_NAME,
CELERY_DEFAULT_EXCHANGE=CELERY_EXCHANGE
)
and my kombu objects are looking like
CELERY_DLX_EXCHANGE = Exchange(CELERY_DLX_EXCHANGE_NAME, type='direct')
CELERY_DLX_QUEUE = Queue(CELERY_DLX_QUEUE_NAME, exchange=DLX_EXCHANGE,
routing_key='celery-dlq')
DEAD_LETTER_CELERY_OPTIONS = {'x-dead-letter-exchange': CELERY_DLX_EXCHANGE_NAME,
'x-dead-letter-routing-key': 'celery-dlq'}
CELERY_EXCHANGE = Exchange(CELERY_EXCHANGE_NAME,
arguments=DEAD_LETTER_CELERY_OPTIONS,
type='direct')
CELERY_QUEUE = Queue(CELERY_QUEUE_NAME,
exchange=CELERY_EXCHANGE,
routing_key='celery-q')
And the task I'm executing is:
class HookTask(Task):
acks_late = True
def run(self, ctx, data):
logger.info('{0} starting {1.name}[{1.request.id}]'.format(self.__class__.__name__.upper(), self))
self.hook_process(ctx, data)
def on_failure(self, exc, task_id, args, kwargs, einfo):
logger.error('task_id %s failed, message: %s', task_id, exc.message)
def hook_process(self, t_ctx, body):
# Build context
ctx = TaskContext(self.request, t_ctx)
logger.info('Task_id: %s, handling request %s', ctx.task_id, ctx.req_id)
raise Reject('no_reason', requeue=False)
I made a little test with it but with no results when raising a Reject exception.
Now I'm wondering if it's a good idea to force the failed task route to the dead-letter queue by overriding the Task.on_failure. I think this would work but I also think that this solution is not so clean because according to what I red celery should do this all alone.
Thanks for your help.
Celery will stop retrying after 7 failed attempts and raise an exception.
Start examining the messages that went to the Dead Letter Queue. Try and re-process the messages to determine the underlying cause of the failure (but sometimes it is a random failure that you cannot reproduce) Once a cause is found, update the system to handle that particular use-case, then move onto the next cause.
Moving messages out of a dead-letter queue You can use dead-letter queue redrive to manage the lifecycle of unconsumed messages. After you have investigated the attributes and related metadata available for standard unconsumed messages in a dead-letter queue, you can redrive the messages back to their source queues.
A dead-letter queue is an Amazon SQS queue that an Amazon SNS subscription can target for messages that can't be delivered to subscribers successfully. Messages that can't be delivered due to client errors or server errors are held in the dead-letter queue for further analysis or reprocessing.
I think you should not add arguments=DEAD_LETTER_CELERY_OPTIONS
in CELERY_EXCHANGE. You should add it to CELERY_QUEUE with queue_arguments=DEAD_LETTER_CELERY_OPTIONS
.
The following example is what I did and it works fine:
from celery import Celery
from kombu import Exchange, Queue
from celery.exceptions import Reject
app = Celery(
'tasks',
broker='amqp://guest@localhost:5672//',
backend='redis://localhost:6379/0')
dead_letter_queue_option = {
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'dead_letter'
}
default_exchange = Exchange('default', type='direct')
dlx_exchange = Exchange('dlx', type='direct')
default_queue = Queue(
'default',
default_exchange,
routing_key='default',
queue_arguments=dead_letter_queue_option)
dead_letter_queue = Queue(
'dead_letter', dlx_exchange, routing_key='dead_letter')
app.conf.task_queues = (default_queue, dead_letter_queue)
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange = 'default'
app.conf.task_default_routing_key = 'default'
@app.task
def add(x, y):
return x + y
@app.task(acks_late=True)
def div(x, y):
try:
z = x / y
return z
except ZeroDivisionError as exc:
raise Reject(exc, requeue=False)
After the creation of queue, you should see that on the 'Features' column, it shows DLX
(dead-letter-exchange) and DLK
(dead-letter-routing-key) labels.
NOTE: You should delete the previous queues, if you have already created them in RabbitMQ. This is because celery won't delete the existing queue and re-create a new one.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With