Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery: how can I route a failed task to a dead-letter queue

Tags:

I'm a newcomer to celery and I try to integrate this task queue into my project but I still don't figure out how celery handles the failed tasks and I'd like to keep all those in a amqp dead-letter queue.

According to the doc here it seems that raising Reject in a Task having acks_late enabled produces the same effect as acking the message and then we have a few words about dead-letter queues.

So I added a custom default queue to my celery config

celery_app.conf.update(CELERY_ACCEPT_CONTENT=['application/json'],
                       CELERY_TASK_SERIALIZER='json',
                       CELERY_QUEUES=[CELERY_QUEUE,
                                      CELERY_DLX_QUEUE],
                       CELERY_DEFAULT_QUEUE=CELERY_QUEUE_NAME,
                       CELERY_DEFAULT_EXCHANGE=CELERY_EXCHANGE
                       )

and my kombu objects are looking like

CELERY_DLX_EXCHANGE = Exchange(CELERY_DLX_EXCHANGE_NAME, type='direct')
CELERY_DLX_QUEUE = Queue(CELERY_DLX_QUEUE_NAME, exchange=DLX_EXCHANGE,
                             routing_key='celery-dlq')

DEAD_LETTER_CELERY_OPTIONS = {'x-dead-letter-exchange': CELERY_DLX_EXCHANGE_NAME,
                          'x-dead-letter-routing-key': 'celery-dlq'}

CELERY_EXCHANGE = Exchange(CELERY_EXCHANGE_NAME,
                               arguments=DEAD_LETTER_CELERY_OPTIONS,
                               type='direct')

CELERY_QUEUE = Queue(CELERY_QUEUE_NAME,
                         exchange=CELERY_EXCHANGE,
                         routing_key='celery-q')

And the task I'm executing is:

class HookTask(Task):
    acks_late = True

def run(self, ctx, data):
    logger.info('{0} starting {1.name}[{1.request.id}]'.format(self.__class__.__name__.upper(), self))
    self.hook_process(ctx, data)


def on_failure(self, exc, task_id, args, kwargs, einfo):
    logger.error('task_id %s failed, message: %s', task_id, exc.message)

def hook_process(self, t_ctx, body):
    # Build context
    ctx = TaskContext(self.request, t_ctx)
    logger.info('Task_id: %s, handling request %s', ctx.task_id, ctx.req_id)
    raise Reject('no_reason', requeue=False)

I made a little test with it but with no results when raising a Reject exception.

Now I'm wondering if it's a good idea to force the failed task route to the dead-letter queue by overriding the Task.on_failure. I think this would work but I also think that this solution is not so clean because according to what I red celery should do this all alone.

Thanks for your help.

like image 322
onizukaek Avatar asked Jun 29 '16 22:06

onizukaek


People also ask

What happens when a celery task fails?

Celery will stop retrying after 7 failed attempts and raise an exception.

How do you process a dead-letter queue?

Start examining the messages that went to the Dead Letter Queue. Try and re-process the messages to determine the underlying cause of the failure (but sometimes it is a random failure that you cannot reproduce) Once a cause is found, update the system to handle that particular use-case, then move onto the next cause.

How do I monitor a dead-letter queue?

Moving messages out of a dead-letter queue You can use dead-letter queue redrive to manage the lifecycle of unconsumed messages. After you have investigated the attributes and related metadata available for standard unconsumed messages in a dead-letter queue, you can redrive the messages back to their source queues.

What happens to messages in dead-letter queue?

A dead-letter queue is an Amazon SQS queue that an Amazon SNS subscription can target for messages that can't be delivered to subscribers successfully. Messages that can't be delivered due to client errors or server errors are held in the dead-letter queue for further analysis or reprocessing.


1 Answers

I think you should not add arguments=DEAD_LETTER_CELERY_OPTIONS in CELERY_EXCHANGE. You should add it to CELERY_QUEUE with queue_arguments=DEAD_LETTER_CELERY_OPTIONS.

The following example is what I did and it works fine:

from celery import Celery
from kombu import Exchange, Queue
from celery.exceptions import Reject

app = Celery(
    'tasks',
    broker='amqp://guest@localhost:5672//',
    backend='redis://localhost:6379/0')

dead_letter_queue_option = {
    'x-dead-letter-exchange': 'dlx',
    'x-dead-letter-routing-key': 'dead_letter'
}

default_exchange = Exchange('default', type='direct')
dlx_exchange = Exchange('dlx', type='direct')

default_queue = Queue(
    'default',
    default_exchange,
    routing_key='default',
    queue_arguments=dead_letter_queue_option)
dead_letter_queue = Queue(
    'dead_letter', dlx_exchange, routing_key='dead_letter')

app.conf.task_queues = (default_queue, dead_letter_queue)

app.conf.task_default_queue = 'default'
app.conf.task_default_exchange = 'default'
app.conf.task_default_routing_key = 'default'


@app.task
def add(x, y):
    return x + y


@app.task(acks_late=True)
def div(x, y):
    try:
        z = x / y
        return z
    except ZeroDivisionError as exc:
        raise Reject(exc, requeue=False)

After the creation of queue, you should see that on the 'Features' column, it shows DLX (dead-letter-exchange) and DLK (dead-letter-routing-key) labels.

enter image description here

NOTE: You should delete the previous queues, if you have already created them in RabbitMQ. This is because celery won't delete the existing queue and re-create a new one.

like image 198
Hengfeng Li Avatar answered Sep 21 '22 03:09

Hengfeng Li