I have a test
queue in celery and I have defined a task for it:
@celery_app.task(queue='test', ignore_result=True)
def priority_test(priority):
print(priority)
which just print the argument.I want to set the priority
attribute which is defined here for appy_async
. So, I wrote a for loop
like this:
for i in range(100):
priority_test.apply_async((i%10,), queue="test", priority=i%10)
I excpected to see some result like this:
[2017-12-26 17:21:37,309: WARNING/ForkPoolWorker-1] 10
[2017-12-26 17:21:37,311: WARNING/ForkPoolWorker-1] 10
[2017-12-26 17:21:37,314: WARNING/ForkPoolWorker-1] 10
[2017-12-26 17:21:37,317: WARNING/ForkPoolWorker-1] 9
[2017-12-26 17:21:37,319: WARNING/ForkPoolWorker-1] 9
[2017-12-26 17:21:37,321: WARNING/ForkPoolWorker-1] 9
[2017-12-26 17:21:37,323: WARNING/ForkPoolWorker-1] 8
[2017-12-26 17:21:37,326: WARNING/ForkPoolWorker-1] 8
[2017-12-26 17:21:37,329: WARNING/ForkPoolWorker-1] 8
[2017-12-26 17:21:37,332: WARNING/ForkPoolWorker-1] 7
[2017-12-26 17:21:37,334: WARNING/ForkPoolWorker-1] 7
[2017-12-26 17:21:37,336: WARNING/ForkPoolWorker-1] 7
[2017-12-26 17:21:37,341: WARNING/ForkPoolWorker-1] 6
[2017-12-26 17:21:37,344: WARNING/ForkPoolWorker-1] 6
[2017-12-26 17:21:37,346: WARNING/ForkPoolWorker-1] 6
[2017-12-26 17:21:37,349: WARNING/ForkPoolWorker-1] 5
[2017-12-26 17:21:37,351: WARNING/ForkPoolWorker-1] 5
[2017-12-26 17:21:37,353: WARNING/ForkPoolWorker-1] 5
[2017-12-26 17:21:37,355: WARNING/ForkPoolWorker-1] 4
[2017-12-26 17:21:37,358: WARNING/ForkPoolWorker-1] 4
[2017-12-26 17:21:37,360: WARNING/ForkPoolWorker-1] 4
means execute the same priorities after each other but it executed them in the normal way:
[2017-12-26 17:21:37,309: WARNING/ForkPoolWorker-1] 10
[2017-12-26 17:21:37,311: WARNING/ForkPoolWorker-1] 9
[2017-12-26 17:21:37,314: WARNING/ForkPoolWorker-1] 8
[2017-12-26 17:21:37,317: WARNING/ForkPoolWorker-1] 7
[2017-12-26 17:21:37,319: WARNING/ForkPoolWorker-1] 6
[2017-12-26 17:21:37,321: WARNING/ForkPoolWorker-1] 5
[2017-12-26 17:21:37,323: WARNING/ForkPoolWorker-1] 4
[2017-12-26 17:21:37,326: WARNING/ForkPoolWorker-1] 3
[2017-12-26 17:21:37,329: WARNING/ForkPoolWorker-1] 2
[2017-12-26 17:21:37,332: WARNING/ForkPoolWorker-1] 1
[2017-12-26 17:21:37,334: WARNING/ForkPoolWorker-1] 10
[2017-12-26 17:21:37,336: WARNING/ForkPoolWorker-1] 9
[2017-12-26 17:21:37,341: WARNING/ForkPoolWorker-1] 8
[2017-12-26 17:21:37,344: WARNING/ForkPoolWorker-1] 7
[2017-12-26 17:21:37,346: WARNING/ForkPoolWorker-1] 6
[2017-12-26 17:21:37,349: WARNING/ForkPoolWorker-1] 5
[2017-12-26 17:21:37,351: WARNING/ForkPoolWorker-1] 4
[2017-12-26 17:21:37,353: WARNING/ForkPoolWorker-1] 3
[2017-12-26 17:21:37,355: WARNING/ForkPoolWorker-1] 2
[2017-12-26 17:21:37,358: WARNING/ForkPoolWorker-1] 1
[2017-12-26 17:21:37,360: WARNING/ForkPoolWorker-1] 10
[2017-12-26 17:21:37,362: WARNING/ForkPoolWorker-1] 9
[2017-12-26 17:21:37,364: WARNING/ForkPoolWorker-1] 8
[2017-12-26 17:21:37,365: WARNING/ForkPoolWorker-1] 7
[2017-12-26 17:21:37,367: WARNING/ForkPoolWorker-1] 6
[2017-12-26 17:21:37,369: WARNING/ForkPoolWorker-1] 5
[2017-12-26 17:21:37,371: WARNING/ForkPoolWorker-1] 4
[2017-12-26 17:21:37,373: WARNING/ForkPoolWorker-1] 3
[2017-12-26 17:21:37,374: WARNING/ForkPoolWorker-1] 2
[2017-12-26 17:21:37,376: WARNING/ForkPoolWorker-1] 1
How should I apply priority
in celery with rabbitmq and what is the priority
attribute in the doc above?
apply_async(args[, kwargs[, …]]) Sends a task message. delay(*args, **kwargs) Shortcut to send a task message, but doesn't support execution options. calling ( __call__ )
Python solves this by using a binary heap to implement the priority queue. The Python priority queue is built on the heapq module, which is basically a binary heap. The get command dequeues the highest priority elements from the queue.
Process of Task Execution by Celery can be broken down into:Your application sends the tasks to the task broker, it is then reserved by a worker for execution & finally the result of task execution is stored in the result backend.
Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task, the Celery client adds a message to the queue, and the broker then delivers that message to a worker. The most commonly used brokers are Redis and RabbitMQ.
In order to have priority
working properly you need to properly configure a couple of settings and you need at least version 3.5.0 of RabbitMQ.
First set the x-max-priority
of your queue to 10. From the docs :
from kombu import Exchange, Queue
app.conf.task_queues = [
Queue('tasks', Exchange('tasks'), routing_key='tasks',
queue_arguments={'x-max-priority': 10},
]
A default value for all queues can be set using the task_queue_max_priority
setting:
app.conf.task_queue_max_priority = 10
Then configure the following settings:
CELERY_ACKS_LATE = True
CELERYD_PREFETCH_MULTIPLIER = 1
By default the prefetch multiplier is 4, which in your case will cause the first 4 tasks with priority 10, 9, 8 and 7 to be fetched before the other tasks are present in the queue. The CELERY_ACKS_LATE
setting will cause the tasks to be acknowledged after they have been executed. You can experiment with this setting to see what behaviour you prefer.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With