I want to manage tasks using Celery. I want to have a single task queue (with concurrency 1) and be able to push tasks onto the queue with different priorities such that higher priority tasks will preempt the others.
I am adding three tasks to a queue like so:
add_tasks.py
from tasks import example_task
example_task.apply_async((1), priority=1)
example_task.apply_async((2), priority=3)
example_task.apply_async((3), priority=2)
I have the following configuration:
tasks.py
from __future__ import absolute_import, unicode_literals
from celery import Celery
from kombu import Queue, Exchange
import time
app = Celery('tasks', backend='rpc://', broker='pyamqp://')
app.conf.task_queues = [Queue('celery', Exchange('celery'), routing_key='celery', queue_arguments={'x-max-priority': 10})]
@app.task
def example_task(task_num):
time.sleep(3)
print('Started {}'.format(task_num)
return True
I expect the second task I added to run before the third, because it has higher priority but it doesn't. They run in the order added.
I am following the docs and thought I had configured the app correctly.
Am I doing something wrong or am I misunderstanding the priority feature?
There is a possibility that the queue has no chance to prioritize the messages (because they get downloaded before the sorting happens). Try with these two settings (adapt to your project as needed):
CELERY_ACKS_LATE = True
CELERYD_PREFETCH_MULTIPLIER = 1
Prefetch multiplier is 4 by default.
I had developed a sample application to realize Celery's priority tasking on a very small scale. Please have a look at it here. While developing it, I had encountered a very similar problem and this change in settings actually solved it.
Note that you also require RabbitMQ version 3.5.0 or higher.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With