Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery tasks being sent to multiple queues

I've noticed that celery has been sending tasks to multiple queues, and workers on both queues have been executing the tasks.

My queue definitions are:

CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('client1', Exchange('client1'), routing_key='client1'),
    Queue('images', Exchange('media'), routing_key='media.images'),
)

And when, after stopping all my workers, I run:

>>> tasks.ping.apply_async(queue='default')

I can see the task appear in both the default and client1 queues:

$ redis-cli -c llen default
(integer) 1
$ redis-cli -c llen client1
(integer) 1

This only applies to the default queue. Sending it directly to the client1 queue only adds it there:

>>> tasks.ping.apply_async(queue='client1')
$ redis-cli -c llen default
(integer) 1
$ redis-cli -c llen client1
(integer) 2

The images queue never receives tasks incorrectly.

This is Celery 3.1.15 with the Redis broker.

like image 905
David Wolever Avatar asked Oct 28 '15 19:10

David Wolever


People also ask

Does Celery run tasks in parallel?

Celery provides a way to both design a workflow for coordination and also execute tasks in parallel.

How do Celery queues work?

Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task, the Celery client adds a message to the queue, and the broker then delivers that message to a worker. The most commonly used brokers are Redis and RabbitMQ.

How many tasks can Celery handle?

celery beats only trigger those 1000 tasks (by the crontab schedule), not run them. If you want to run 1000 tasks in parallel, you should have enough celery workers available to run those tasks.

What is the default queue in Celery?

By default, Celery routes all tasks to a single queue and all workers consume this default queue. With Celery queues, you can control which Celery workers process which tasks. This can be useful if you have a slow and a fast task and you want the slow tasks not to interfere with the fast tasks.


1 Answers

Okay! It looks like the problem is that Kombu's Redis broker doesn't clear old exchanges + routing keys.

Initially I had configured the queues:

CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('client1', Exchange('default'), routing_key='default'),
)

And later changed them to use a separate exchange and routing key for client1.

But for some reason Kombu didn't clear out the old bindings, so I was left with:

redis> smembers _kombu.binding.default
1) "default\x06\x16\x06\x16client1"
2) "default.client1\x06\x16\x06\x16client1"
3) "default\x06\x16\x06\x16default"

So tasks sent to default were being routed to both the default and client1 queues.

The fix was to remove the incorrect bindings:

redis> srem _kombu.binding.default "default\x06\x16\x06\x16client1"
redis> srem _kombu.binding.default "default.client1\x06\x16\x06\x16client1"
like image 135
David Wolever Avatar answered Oct 16 '22 06:10

David Wolever