Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to ensure task execution order per user using Celery, RabbitMQ and Django?

I'm running Django, Celery and RabbitMQ. What I'm trying to achieve is to ensure, that tasks related to one user are executed in order (specifically, one at the time, I don't want task concurrency per user)

  • whenever new task is added for user, it should depend on the most recently added task. Additional functionality might include not adding task to queue, if task of this type is queued for this user and has not yet started.

I've done some research and:

  • I couldn't find a way to link newly created task with already queued one in Celery itself, chains seem to be only able to link new tasks.
  • I think that both functionalities are possible to implement with custom RabbitMQ message handler, though it might be hard to code after all.
  • I've also read about celery-tasktree and this might be an easiest way to ensure execution order, but how do I link new task with already "applied_async" task_tree or queue? Is there any way that I could implement that additional no-duplicate functionality using this package?

Edit: There is this also this "lock" example in celery cookbook and as the concept is fine, I can't see a possible way to make it work as intended in my case - simply if I can't acquire lock for user, task would have to be retried, but this means pushing it to the end of queue.

What would be the best course of action here?

like image 496
Pearley Avatar asked Apr 30 '15 12:04

Pearley


1 Answers

If you configure the celery workers so that they can only execute one task at a time (see worker_concurrency setting), then you could enforce the concurrency that you need on a per user basis. Using a method like

NUMBER_OF_CELERY_WORKERS = 10

def get_task_queue_for_user(user):
    return "user_queue_{}".format(user.id % NUMBER_OF_CELERY_WORKERS)

to get the task queue based on the user id, every task will be assigned to the same queue for each user. The workers would need to be configured to only consume tasks from a single task queue.

It would play out like this:

  1. User 49 triggers a task

  2. The task is sent to user_queue_9

  3. When the one and only celery worker that is listening to user_queue_9 is ready to consume a new task, the task is executed

This is a hacky answer though, because

  • requiring just a single celery worker for each queue is a brittle system -- if the celery worker stops, the whole queue stops

  • the workers are running inefficiently

like image 110
Mark Chackerian Avatar answered Oct 04 '22 04:10

Mark Chackerian