Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Prioritizing queues among multiple queues in celery?

We are using celery for our asynchronous background tasks and we have 2 queues for different priority tasks. We have 2 cluster of nodes serving them separately. Things are working well as expected.

Question:

We get mostly low priority tasks. For optimized resource utilization, I am wondering is there a way to configure workers(listening to high priority queue) to listen to both queues. But take jobs from the higher priority queue as long as some job is there? and fallback to low priority queue otherwise.

I have gone through the priority based task scheduling discussed @ Celery Task Priority.

But my questions is prioritize queues not just tasks within a queue.

like image 600
arunk2 Avatar asked Sep 13 '17 18:09

arunk2


People also ask

What method do you use for task prioritization in Python?

The queue. PriorityQueue class needs to maintain the order of its elements, a sorting mechanism is required every time a new element is enqueued. Python solves this by using a binary heap to implement the priority queue. The Python priority queue is built on the heapq module, which is basically a binary heap.

How do you make multiple workers with Celery?

You probably just need to add the --concurrency or -c argument when starting the worker to spawn multiple (parallel) worker instances. Show activity on this post. You can look for Canvas primitives there you can see how to make groups for parallel execution. class celery.

What is the default queue in Celery?

By default, Celery routes all tasks to a single queue and all workers consume this default queue.

What is Celery routing key?

In Celery the routing_key is the key used to send the message, while binding_key is the key the queue is bound with. In the AMQP API they are both referred to as the routing key.


2 Answers

You can partially achieve this by defining multiple queues for the worker, when starting it.

You can do it with the following command: Also, refer here for more details.

celery -A proj worker -l info -Q Q1,Q2

Though this approach has a problem. It doesn't do it with fallback kind of approach. Since, workers listening to multiple queue evenly distribute the resources among them.

Hence, your requirement of processing only from 'high priority queue' even when there is something in 'normal priority queue' cannot be achieved. This can be minimized by allocating more Workers (may be 75%) for 'high priority queue' and 25% for 'normal priority queue'. or different share based on you work load.

like image 176
Suresh Avatar answered Oct 13 '22 21:10

Suresh


This is now possible with Celery >= 4.1.1 + Redis transport (probably earlier version too). You just need to set a broker transport option in your celeryconfig.py module. This setting was implemented with Kombu 4.0.0.

broker_transport_options = {
  visibility_timeout: 1200,  # this doesn't affect priority, but it's part of redis config
  queue_order_strategy: 'priority'
}

It's also possible to specify with an environment variable.

For a worker started with $ celery -A proj worker -l info -Q Q1,Q2 the idle worker will check Q1 first and execute Q1 tasks if available before checking Q2.

source

Bonus off topic help, this also works with Airflow 1.10.2 workers, except it seems like the queue order is not preserved from the command line. Using 'queue_order_strategy'='sorted' and naming your queues appropriately works (Q1, Q2 would work perfectly). Airflow pool-based priority is not preserved between dags so this really helps!

like image 25
c-wilson Avatar answered Oct 13 '22 20:10

c-wilson