Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery Closes Unexpectedly After Longer Inactivity

So I am using a RabbitMQ + Celery to create a simple RPC architecture. I have one RabbitMQ message broker and one remote worker which runs Celery deamon.

There is a third server which exposes a thin RESTful API. When it receives HTTP request, it sends a task to the remote worker, waits for response and returns a response.

This works great most of the time. However I have notices that after a longer inactivity (say 5 minutes of no incoming requests), the Celery worker behaves strangely. First 3 tasks received after a longer inactivity return this error:

exchange.declare: connection closed unexpectedly

After three erroneous tasks it works again. If there are not tasks for longer period of time, the same thing happens. Any idea?

My init script for the Celery worker:

# description "Celery worker using sync broker"

console log

start on runlevel [2345]
stop on runlevel [!2345]

setuid richard
setgid richard

script
chdir /usr/local/myproject/myproject
exec /usr/local/myproject/venv/bin/celery worker -n celery_worker_deamon.%h -A proj.sync_celery -Q sync_queue -l info --autoscale=10,3 --autoreload --purge
end script

respawn

My celery config:

# Synchronous blocking tasks
BROKER_URL_SYNC = 'amqp://guest:guest@localhost:5672//'
# Asynchronous non blocking tasks
BROKER_URL_ASYNC = 'amqp://guest:guest@localhost:5672//'

#: Only add pickle to this list if your broker is secured
#: from unwanted access (see userguide/security.html)
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'
CELERY_ENABLE_UTC = True
CELERY_BACKEND = 'amqp'

# http://docs.celeryproject.org/en/latest/userguide/tasks.html#disable-rate-limits-if-they-re-not-used
CELERY_DISABLE_RATE_LIMITS = True

# http://docs.celeryproject.org/en/latest/userguide/routing.html
CELERY_DEFAULT_QUEUE = 'sync_queue'
CELERY_DEFAULT_EXCHANGE = "tasks"
CELERY_DEFAULT_EXCHANGE_TYPE = "topic"
CELERY_DEFAULT_ROUTING_KEY = "sync_task.default"
CELERY_QUEUES = {
    'sync_queue': {
        'binding_key':'sync_task.#',
    },
    'async_queue': {
        'binding_key':'async_task.#',
    },
}

Any ideas?

EDIT:

Ok, now it appears to happen randomly. I noticed this in RabbitMQ logs:

=WARNING REPORT==== 6-Jan-2014::17:31:54 ===
closing AMQP connection <0.295.0> (some_ip_address:36842 -> some_ip_address:5672):
connection_closed_abruptly
like image 771
Richard Knop Avatar asked Jan 06 '14 17:01

Richard Knop


2 Answers

Is your RabbitMQ server or your Celery worker behind a load balancer by any chance? If yes, then the load balancer is closing the TCP connection after some period of inactivity. In which case, you will have to enable heartbeat from the client (worker) side. If you do, I would not recommend using the pure Python amqp lib for this. Instead, replace it with librabbitmq.

like image 121
pbhowmick Avatar answered Oct 11 '22 03:10

pbhowmick


The connection_closed_abruptly is caused when clients disconnecting without the proper AMQP shutdown protocol:

channel.close(...)

Request a channel close.

This method indicates that the sender wants to close the channel. This may be due to internal conditions (e.g. a forced shut-down) or due to an error handling a specific method, i.e. an exception. When a close is due to an exception, the sender provides the class and method id of the method which caused the exception.

After sending this method, any received methods except Close and Close-OK MUST be discarded. The response to receiving a Close after sending Close must be to send Close-Ok.

channel.close-ok():

Confirm a channel close.

This method confirms a Channel.Close method and tells the recipient that it is safe to release resources for the channel.

A peer that detects a socket closure without having received a Channel.Close-Ok handshake method SHOULD log the error.

Here is an issue about that.

Can you set your custom configuration for BROKER_HEARTBEAT and BROKER_HEARTBEAT_CHECKRATE and check again, for example:

BROKER_HEARTBEAT = 10 
BROKER_HEARTBEAT_CHECKRATE = 2.0
like image 2
Omid Raha Avatar answered Oct 11 '22 03:10

Omid Raha