Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Ensuring the order of tasks from different sources in Celery

May ask you a question about Celery?

I have different writers that write a task every X minutes. Every task needs that the task from the same writer is completed. The system as is working well, as X minutes >> few seconds to do the tasks.

But, now, it may happen that the writers sends two or three tasks in the same time. Obviously, Celery + RabbitMQ will distribute this tasks to different workers, creating troubles.

I've searched, but I found responses about blocking with a lock a worker until the other finished (using for example Redis), but this is not possible, as I have less workers that writer.

I need something like N queues for N writers, and Celery capable to understand the order in each queue. I will have literally thousands of writers, so I can't create so many workers.

Example: A B C writers, A1, A2... tasks, and only one worker

I receive, in the "same" time A1,A2,B1,C1,B2,C2,A3,B3,C3

Celery should create the queue A (1-2-3) B (1-2-3) C (1-2-3)

And the sending the task A1, then, the next, it's not important if is A2,B1,C1, but it shouldn't be A3,B2,B3,C2,C3.

Hope I explained well

Thanks!

like image 747
Alby87 Avatar asked Feb 21 '15 19:02

Alby87


1 Answers

I think you need to create one worker per queue to enforce the ordering like that. Otherwise the worker will just uses a first-in, first-out approach to handling tasks. You can create as many queues as you want and configure which of those queues each worker receives messages from. You can pass the -Q parameter when starting the worker to set its queues, as discussed in the Workers Guide.

celery -A my_project worker -l info -Q A

Then you can setup global mappings that define which queues each task goes to using the Routing Guide.

CELERY_ROUTES = {
    'my_app.tasks.task_a1': {'queue': 'A'},
    'my_app.tasks.task_a2': {'queue': 'A'},
    'my_app.tasks.task_b1': {'queue': 'B'},
    'my_app.tasks.task_c1': {'queue': 'C'},
}

Alternatively, you can specify the queue at the time you submit each task instance based on the Calling Tasks Guide.

task_a1.apply_async(queue='A')
like image 147
Chris Ward Avatar answered Nov 04 '22 13:11

Chris Ward