Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

celery .delay hangs (recent, not an auth problem)

I am running Celery 2.2.4/djCelery 2.2.4, using RabbitMQ 2.1.1 as a backend. I recently brought online two new celery servers -- I had been running 2 workers across two machines with a total of ~18 threads and on my new souped up boxes (36g RAM + dual hyper-threaded quad-core), I am running 10 workers with 8 threads each, for a total of 180 threads -- my tasks are all pretty small so this should be fine.

The nodes have been running fine for the last few days, but today I noticed that .delaay() is hanging. When I interrupt it, I see a traceback that points here:

File "/home/django/deployed/releases/20110608183345/virtual-env/lib/python2.5/site-packages/celery/task/base.py", line 324, in delay
    return self.apply_async(args, kwargs)
File "/home/django/deployed/releases/20110608183345/virtual-env/lib/python2.5/site-packages/celery/task/base.py", line 449, in apply_async
    publish.close()
File "/home/django/deployed/virtual-env/lib/python2.5/site-packages/kombu/compat.py", line 108, in close
    self.backend.close()
File "/home/django/deployed/virtual-env/lib/python2.5/site-packages/amqplib/client_0_8/channel.py", line 194, in close
    (20, 41),    # Channel.close_ok
File "/home/django/deployed/virtual-env/lib/python2.5/site-packages/amqplib/client_0_8/abstract_channel.py", line 89, in wait
    self.channel_id, allowed_methods)
File "/home/django/deployed/virtual-env/lib/python2.5/site-packages/amqplib/client_0_8/connection.py", line 198, in _wait_method
    self.method_reader.read_method()
File "/home/django/deployed/virtual-env/lib/python2.5/site-packages/amqplib/client_0_8/method_framing.py", line 212, in read_method
    self._next_method()
File "/home/django/deployed/virtual-env/lib/python2.5/site-packages/amqplib/client_0_8/method_framing.py", line 127, in _next_method
    frame_type, channel, payload = self.source.read_frame()
File "/home/django/deployed/virtual-env/lib/python2.5/site-packages/amqplib/client_0_8/transport.py", line 109, in read_frame
    frame_type, channel, size = unpack('>BHI', self._read(7))
File "/home/django/deployed/virtual-env/lib/python2.5/site-packages/amqplib/client_0_8/transport.py", line 200, in _read
    s = self.sock.recv(65536)

I've checked the Rabbit logs, and I see it the process trying to connect as:

=INFO REPORT==== 12-Jun-2011::22:58:12 ===
accepted TCP connection on 0.0.0.0:5672 from x.x.x.x:48569

I have my Celery log level set to INFO, but I don't see anything particularly interesting in the Celery logs EXCEPT that 2 of the workers can't connect to the broker:

[2011-06-12 22:41:08,033: ERROR/MainProcess] Consumer: Connection to broker lost. Trying to re-establish connection...

All of the other nodes are able to connect without issue.

I know that there was a posting ( RabbitMQ / Celery with Django hangs on delay/ready/etc - No useful log info ) last year of a similar nature, but I'm pretty certain that this is different. Could it be that the sheer number of workers is creating some sort of a race condition in amqplib -- I found this thread which seems to indicate that amqplib is not thread-safe, not sure if this matters for Celery.

EDIT: I've tried celeryctl purge on both nodes -- on one it succeeds, but on the other it fails with the following AMQP error:

AMQPConnectionException(reply_code, reply_text, (class_id, method_id))
    amqplib.client_0_8.exceptions.AMQPConnectionException: 
    (530, u"NOT_ALLOWED - cannot redeclare exchange 'XXXXX' in vhost 'XXXXX' 
     with different type, durable or autodelete   value", (40, 10), 'Channel.exchange_declare')

On both nodes, inspect stats hangs with the "can't close connection" traceback above. I'm at a loss here.

EDIT2: I was able to delete the offending exchange using exchange.delete from camqadm and now the second node hangs too :(.

EDIT3: One thing that also recently changed is that I added an additional vhost to rabbitmq, which my staging node connects to.

like image 492
Bacon Avatar asked Jun 13 '11 02:06

Bacon


People also ask

What is apply_async in celery?

The API defines a standard set of execution options, as well as three methods: apply_async(args[, kwargs[, …]]) Sends a task message. delay(*args, **kwargs) Shortcut to send a task message, but does not support execution options.

What dependencies does Celery have?

We have built-in support for JSON, YAML, Pickle, and msgpack. Every task is associated with a content type, so you can even send one task using pickle, another using JSON. The default serialization support used to be pickle, but since 4.0 the default is now JSON.

How does celery backend work?

Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task the client adds a message to the queue, the broker then delivers that message to a worker.

What is celery broker?

The broker is the third-person facilitator between a buyer and a seller. Celery requires a solution to send and receive messages; usually, this comes in the form of a separate service called a message broker. In celery, the broker is Redis, RabbitMQ, etc who conveying the message between a client and celery.


1 Answers

Hopefully this will save somebody a lot of time...though it certainly does not save me any embarrassment:

/var was full on the server that was running rabbit. With all of the nodes that I added, rabbit was doing a lot more logging and it filled up /var -- I couldn't write to /var/lib/rabbitmq, and so no messages were going through.

like image 132
Bacon Avatar answered Oct 04 '22 22:10

Bacon