I am running Celery 2.2.4/djCelery 2.2.4, using RabbitMQ 2.1.1 as a backend. I recently brought online two new celery servers -- I had been running 2 workers across two machines with a total of ~18 threads and on my new souped up boxes (36g RAM + dual hyper-threaded quad-core), I am running 10 workers with 8 threads each, for a total of 180 threads -- my tasks are all pretty small so this should be fine.
The nodes have been running fine for the last few days, but today I noticed that .delaay()
is hanging. When I interrupt it, I see a traceback that points here:
File "/home/django/deployed/releases/20110608183345/virtual-env/lib/python2.5/site-packages/celery/task/base.py", line 324, in delay
return self.apply_async(args, kwargs)
File "/home/django/deployed/releases/20110608183345/virtual-env/lib/python2.5/site-packages/celery/task/base.py", line 449, in apply_async
publish.close()
File "/home/django/deployed/virtual-env/lib/python2.5/site-packages/kombu/compat.py", line 108, in close
self.backend.close()
File "/home/django/deployed/virtual-env/lib/python2.5/site-packages/amqplib/client_0_8/channel.py", line 194, in close
(20, 41), # Channel.close_ok
File "/home/django/deployed/virtual-env/lib/python2.5/site-packages/amqplib/client_0_8/abstract_channel.py", line 89, in wait
self.channel_id, allowed_methods)
File "/home/django/deployed/virtual-env/lib/python2.5/site-packages/amqplib/client_0_8/connection.py", line 198, in _wait_method
self.method_reader.read_method()
File "/home/django/deployed/virtual-env/lib/python2.5/site-packages/amqplib/client_0_8/method_framing.py", line 212, in read_method
self._next_method()
File "/home/django/deployed/virtual-env/lib/python2.5/site-packages/amqplib/client_0_8/method_framing.py", line 127, in _next_method
frame_type, channel, payload = self.source.read_frame()
File "/home/django/deployed/virtual-env/lib/python2.5/site-packages/amqplib/client_0_8/transport.py", line 109, in read_frame
frame_type, channel, size = unpack('>BHI', self._read(7))
File "/home/django/deployed/virtual-env/lib/python2.5/site-packages/amqplib/client_0_8/transport.py", line 200, in _read
s = self.sock.recv(65536)
I've checked the Rabbit logs, and I see it the process trying to connect as:
=INFO REPORT==== 12-Jun-2011::22:58:12 ===
accepted TCP connection on 0.0.0.0:5672 from x.x.x.x:48569
I have my Celery log level set to INFO
, but I don't see anything particularly interesting in the Celery logs EXCEPT that 2 of the workers can't connect to the broker:
[2011-06-12 22:41:08,033: ERROR/MainProcess] Consumer: Connection to broker lost. Trying to re-establish connection...
All of the other nodes are able to connect without issue.
I know that there was a posting ( RabbitMQ / Celery with Django hangs on delay/ready/etc - No useful log info ) last year of a similar nature, but I'm pretty certain that this is different. Could it be that the sheer number of workers is creating some sort of a race condition in amqplib
-- I found this thread which seems to indicate that amqplib
is not thread-safe, not sure if this matters for Celery.
EDIT: I've tried celeryctl purge
on both nodes -- on one it succeeds, but on the other it fails with the following AMQP error:
AMQPConnectionException(reply_code, reply_text, (class_id, method_id))
amqplib.client_0_8.exceptions.AMQPConnectionException:
(530, u"NOT_ALLOWED - cannot redeclare exchange 'XXXXX' in vhost 'XXXXX'
with different type, durable or autodelete value", (40, 10), 'Channel.exchange_declare')
On both nodes, inspect stats
hangs with the "can't close connection" traceback above. I'm at a loss here.
EDIT2: I was able to delete the offending exchange using exchange.delete
from camqadm
and now the second node hangs too :(.
EDIT3: One thing that also recently changed is that I added an additional vhost to rabbitmq, which my staging node connects to.
The API defines a standard set of execution options, as well as three methods: apply_async(args[, kwargs[, …]]) Sends a task message. delay(*args, **kwargs) Shortcut to send a task message, but does not support execution options.
We have built-in support for JSON, YAML, Pickle, and msgpack. Every task is associated with a content type, so you can even send one task using pickle, another using JSON. The default serialization support used to be pickle, but since 4.0 the default is now JSON.
Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task the client adds a message to the queue, the broker then delivers that message to a worker.
The broker is the third-person facilitator between a buyer and a seller. Celery requires a solution to send and receive messages; usually, this comes in the form of a separate service called a message broker. In celery, the broker is Redis, RabbitMQ, etc who conveying the message between a client and celery.
Hopefully this will save somebody a lot of time...though it certainly does not save me any embarrassment:
/var
was full on the server that was running rabbit. With all of the nodes that I added, rabbit was doing a lot more logging and it filled up /var
-- I couldn't write to /var/lib/rabbitmq
, and so no messages were going through.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With