Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery worker: How to consume from all queues?

Tags:

python

celery

I have

  • set CELERY_CREATE_MISSING_QUEUES = True
  • NOT defined CELERY_QUEUES
  • defined CELERY_DEFAULT_QUEUE = 'default' (of type direct)
  • a custom router class that creates routes on the fly as shown in this ticket (https://github.com/celery/celery/issues/150).

I see that the new queue in the route returned by the custom router gets created which I assume is because of CELERY_CREATE_MISSING_QUEUES.

Now in the worker node that I run, I don't pass the -Q argument and it consumes only from the 'default' queue which seems to be in line with the documentation -

By default it will consume from all queues defined in the CELERY_QUEUES setting (which if not specified defaults to the queue named celery).

Is there any way to get my worker node to consume from ALL queues including the ones that are created dynamically?

Thanks,

like image 668
ksrini Avatar asked Sep 30 '14 10:09

ksrini


People also ask

How many tasks can celery handle?

celery beats only trigger those 1000 tasks (by the crontab schedule), not run them.

What is a soft time limit?

Soft, or hard? The time limit is set in two values, soft and hard . The soft time limit allows the task to catch an exception to clean up before it is killed: the hard timeout isn't catch-able and force terminates the task.

What is rate limit in celery?

The rate limits can be specified in seconds, minutes or hours by appending “/s”, >“/m” or “/h” to the value. Tasks will be evenly distributed over the specified >time frame. Example: “100/m” (hundred tasks a minute). This will enforce a minimum delay of >600ms between starting two tasks on the same worker instance.

How does a celery worker work?

Dedicated worker processes constantly monitor task queues for new work to perform. Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task the client adds a message to the queue, the broker then delivers that message to a worker.


1 Answers

The worker needs to be told about these automatically or dynamically created queues, so you need a way to get these queue names and store them maybe when you create them or get them maybe from rabbitmqctl list_queues if you're using RabbitMQ as the broker, and for example add a signal handler to add these dynamic queues to the workers to consume from.

For example using celeryd_after_setup signal:

from celery.signals import celeryd_after_setup

@celeryd_after_setup.connect
def add_dynamic_queue(sender, instance, **kwargs):
    # get the dynamic queue, maybe stored somewhere
    queue = 'dynamic_queue'
    instance.app.amqp.queues.select_add(queue)

If you always have new dynamic queues created, you can also command the workers to start consuming from these queues at runtime using:

#command all workers to consume from the 'dynamic_queue' queue
app.control.add_consumer('dynamic_queue', reply=True)

# command specific workers
app.control.add_consumer('dynamic_queue', reply=True, destination=[w1@example])

See Adding Consumers.

I hope this helps, I'll edit the question when I get more info about this.

like image 173
Pierre Avatar answered Sep 18 '22 15:09

Pierre