I have
CELERY_CREATE_MISSING_QUEUES = True
CELERY_QUEUES
CELERY_DEFAULT_QUEUE = 'default'
(of type direct)I see that the new queue in the route returned by the custom router gets created which I assume is because of CELERY_CREATE_MISSING_QUEUES
.
Now in the worker node that I run, I don't pass the -Q
argument and it consumes only from the 'default' queue which seems to be in line with the documentation -
By default it will consume from all queues defined in the CELERY_QUEUES setting (which if not specified defaults to the queue named celery).
Is there any way to get my worker node to consume from ALL queues including the ones that are created dynamically?
Thanks,
celery beats only trigger those 1000 tasks (by the crontab schedule), not run them.
Soft, or hard? The time limit is set in two values, soft and hard . The soft time limit allows the task to catch an exception to clean up before it is killed: the hard timeout isn't catch-able and force terminates the task.
The rate limits can be specified in seconds, minutes or hours by appending “/s”, >“/m” or “/h” to the value. Tasks will be evenly distributed over the specified >time frame. Example: “100/m” (hundred tasks a minute). This will enforce a minimum delay of >600ms between starting two tasks on the same worker instance.
Dedicated worker processes constantly monitor task queues for new work to perform. Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task the client adds a message to the queue, the broker then delivers that message to a worker.
The worker needs to be told about these automatically or dynamically created queues, so you need a way to get these queue names and store them maybe when you create them or get them maybe from rabbitmqctl list_queues
if you're using RabbitMQ as the broker, and for example add a signal handler to add these dynamic queues to the workers to consume from.
For example using celeryd_after_setup
signal:
from celery.signals import celeryd_after_setup
@celeryd_after_setup.connect
def add_dynamic_queue(sender, instance, **kwargs):
# get the dynamic queue, maybe stored somewhere
queue = 'dynamic_queue'
instance.app.amqp.queues.select_add(queue)
If you always have new dynamic queues created, you can also command the workers to start consuming from these queues at runtime using:
#command all workers to consume from the 'dynamic_queue' queue
app.control.add_consumer('dynamic_queue', reply=True)
# command specific workers
app.control.add_consumer('dynamic_queue', reply=True, destination=[w1@example])
See Adding Consumers.
I hope this helps, I'll edit the question when I get more info about this.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With