I'm running Django, Celery and RabbitMQ. What I'm trying to achieve is to ensure, that tasks related to one user are executed in order (specifically, one at the time, I don't want task concurrency per user)
I've done some research and:
applied_async
" task_tree or queue? Is there any way that I could implement that additional no-duplicate functionality using this package?Edit: There is this also this "lock" example in celery cookbook and as the concept is fine, I can't see a possible way to make it work as intended in my case - simply if I can't acquire lock for user, task would have to be retried, but this means pushing it to the end of queue.
What would be the best course of action here?
If you configure the celery workers so that they can only execute one task at a time (see worker_concurrency setting), then you could enforce the concurrency that you need on a per user basis. Using a method like
NUMBER_OF_CELERY_WORKERS = 10
def get_task_queue_for_user(user):
return "user_queue_{}".format(user.id % NUMBER_OF_CELERY_WORKERS)
to get the task queue based on the user id, every task will be assigned to the same queue for each user. The workers would need to be configured to only consume tasks from a single task queue.
It would play out like this:
User 49 triggers a task
The task is sent to user_queue_9
When the one and only celery worker that is listening to user_queue_9
is ready to consume a new task, the task is executed
This is a hacky answer though, because
requiring just a single celery worker for each queue is a brittle system -- if the celery worker stops, the whole queue stops
the workers are running inefficiently
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With