I've noticed that celery has been sending tasks to multiple queues, and workers on both queues have been executing the tasks.
My queue definitions are:
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('client1', Exchange('client1'), routing_key='client1'),
Queue('images', Exchange('media'), routing_key='media.images'),
)
And when, after stopping all my workers, I run:
>>> tasks.ping.apply_async(queue='default')
I can see the task appear in both the default
and client1
queues:
$ redis-cli -c llen default
(integer) 1
$ redis-cli -c llen client1
(integer) 1
This only applies to the default
queue. Sending it directly to the client1
queue only adds it there:
>>> tasks.ping.apply_async(queue='client1')
$ redis-cli -c llen default
(integer) 1
$ redis-cli -c llen client1
(integer) 2
The images
queue never receives tasks incorrectly.
This is Celery 3.1.15 with the Redis broker.
Celery provides a way to both design a workflow for coordination and also execute tasks in parallel.
Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task, the Celery client adds a message to the queue, and the broker then delivers that message to a worker. The most commonly used brokers are Redis and RabbitMQ.
celery beats only trigger those 1000 tasks (by the crontab schedule), not run them. If you want to run 1000 tasks in parallel, you should have enough celery workers available to run those tasks.
By default, Celery routes all tasks to a single queue and all workers consume this default queue. With Celery queues, you can control which Celery workers process which tasks. This can be useful if you have a slow and a fast task and you want the slow tasks not to interfere with the fast tasks.
Okay! It looks like the problem is that Kombu's Redis broker doesn't clear old exchanges + routing keys.
Initially I had configured the queues:
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('client1', Exchange('default'), routing_key='default'),
)
And later changed them to use a separate exchange and routing key for client1
.
But for some reason Kombu didn't clear out the old bindings, so I was left with:
redis> smembers _kombu.binding.default
1) "default\x06\x16\x06\x16client1"
2) "default.client1\x06\x16\x06\x16client1"
3) "default\x06\x16\x06\x16default"
So tasks sent to default
were being routed to both the default
and client1
queues.
The fix was to remove the incorrect bindings:
redis> srem _kombu.binding.default "default\x06\x16\x06\x16client1"
redis> srem _kombu.binding.default "default.client1\x06\x16\x06\x16client1"
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With