Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

celery - Tasks that need to run in priority

In my website users can UPDATE they profile (manual) every time he want, or automatic once a day.

This task is being distributed with celery now.

But i have a "problem" :

Every day, in automatic update, a job put ALL users (+-6k users) on queue:

from celery import group from tasks import * import datetime from lastActivityDate.models import UserActivity  today   = datetime.datetime.today() one_day = datetime.timedelta(days=5) today -= one_day  print datetime.datetime.today()  user_list = UserActivity.objects.filter(last_activity_date__gte=today) g = group(update_user_profile.s(i.user.auth.username) for i in user_list)  print datetime.datetime.today() print g(user_list.count()).get() 

If someone try to do the manual update, they will enter on te queue and last forever to be executed.

Is there a way to set this manual task to run in a piority way? Or make a dedicated for each separated queue: manual and automatic?

like image 407
fabriciols Avatar asked Apr 04 '13 11:04

fabriciols


People also ask

How does Celery execute tasks?

Introduction. Celery is a task queue/job queue based on asynchronous message passing. It can be used as a background task processor for your application in which you dump your tasks to execute in the background or at any given moment. It can be configured to execute your tasks synchronously or asynchronously.

How many tasks can Celery handle?

celery beats only trigger those 1000 tasks (by the crontab schedule), not run them.

What method do you use for task prioritization in Python?

The queue. Python solves this by using a binary heap to implement the priority queue. The Python priority queue is built on the heapq module, which is basically a binary heap. The get command dequeues the highest priority elements from the queue. Priority-object pairs can also be inserted into the queue.

Does Celery run tasks in parallel?

Demonstration of a task which runs a startup task, then parallelizes multiple worker tasks, and then fires-off a reducer task. If passing results around would be important, then could use a chord instead for task2 and task3 .


2 Answers

Celery does not support task priority. (v3.0)

http://docs.celeryproject.org/en/master/faq.html#does-celery-support-task-priorities

You may solve this problem by routing tasks.

http://docs.celeryproject.org/en/latest/userguide/routing.html

Prepare default and priority_high Queue.

from kombu import Queue CELERY_DEFAULT_QUEUE = 'default' CELERY_QUEUES = (     Queue('default'),     Queue('priority_high'), ) 

Run two daemon.

user@x:/$ celery worker -Q priority_high user@y:/$ celery worker -Q default,priority_high 

And route task.

your_task.apply_async(args=['...'], queue='priority_high') 
like image 104
Satoshi Yoshinaga Avatar answered Sep 22 '22 00:09

Satoshi Yoshinaga


If you use RabbitMQ transport then configure your queues the following way: settings.py

from kombu import Queue ... CELERY_TASK_QUEUES = (     Queue('default', routing_key='task_default.#', max_priority=10),      ...) 

Then run your tasks:

my_low_prio_task.apply_async(args=(...), priority=1) my_high_prio_task.apply_async(args=(...), priority=10) 

Presently this code works for kombu==4.6.11, celery==4.4.6.

like image 23
Andrey St Avatar answered Sep 20 '22 00:09

Andrey St