In my website users can UPDATE they profile (manual) every time he want, or automatic once a day.
This task is being distributed with celery now.
But i have a "problem" :
Every day, in automatic update, a job put ALL users (+-6k users) on queue:
from celery import group from tasks import * import datetime from lastActivityDate.models import UserActivity today = datetime.datetime.today() one_day = datetime.timedelta(days=5) today -= one_day print datetime.datetime.today() user_list = UserActivity.objects.filter(last_activity_date__gte=today) g = group(update_user_profile.s(i.user.auth.username) for i in user_list) print datetime.datetime.today() print g(user_list.count()).get()
If someone try to do the manual update, they will enter on te queue and last forever to be executed.
Is there a way to set this manual task to run in a piority way? Or make a dedicated for each separated queue: manual and automatic?
Introduction. Celery is a task queue/job queue based on asynchronous message passing. It can be used as a background task processor for your application in which you dump your tasks to execute in the background or at any given moment. It can be configured to execute your tasks synchronously or asynchronously.
celery beats only trigger those 1000 tasks (by the crontab schedule), not run them.
The queue. Python solves this by using a binary heap to implement the priority queue. The Python priority queue is built on the heapq module, which is basically a binary heap. The get command dequeues the highest priority elements from the queue. Priority-object pairs can also be inserted into the queue.
Demonstration of a task which runs a startup task, then parallelizes multiple worker tasks, and then fires-off a reducer task. If passing results around would be important, then could use a chord instead for task2 and task3 .
Celery does not support task priority. (v3.0)
http://docs.celeryproject.org/en/master/faq.html#does-celery-support-task-priorities
You may solve this problem by routing tasks.
http://docs.celeryproject.org/en/latest/userguide/routing.html
Prepare default and priority_high Queue.
from kombu import Queue CELERY_DEFAULT_QUEUE = 'default' CELERY_QUEUES = ( Queue('default'), Queue('priority_high'), )
Run two daemon.
user@x:/$ celery worker -Q priority_high user@y:/$ celery worker -Q default,priority_high
And route task.
your_task.apply_async(args=['...'], queue='priority_high')
If you use RabbitMQ transport then configure your queues the following way: settings.py
from kombu import Queue ... CELERY_TASK_QUEUES = ( Queue('default', routing_key='task_default.#', max_priority=10), ...)
Then run your tasks:
my_low_prio_task.apply_async(args=(...), priority=1) my_high_prio_task.apply_async(args=(...), priority=10)
Presently this code works for kombu==4.6.11, celery==4.4.6.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With