In our environment, we use RabbitMQ and Celery on AWS to run tasks in parallel over many nodes.
Recently we turned RabbitMQ into a cluster of 3 nodes, configured a ha policy and added an AWS elastic load balancer (ELB) for port 5672 to all 3 nodes. Our Celery workers and client code all use the ELB DNS as the broker URL.
We have noticed since that change, that waiting for async tasks to finish will throw an exception IOError: Socket closed
.
The ELB will shutdown all idle connections after 60 seconds. We have tasks that take few hours to complete.
Setting BROKER_HEARTBEAT to a value lower than 60 solved connection drops on the workers end. But we can't seem to find any setting that will keep the client connection alive.
Is this the correct approach to wait for long running tasks with Celery?
One workaround we haven't tested yet, is to recall the AsyncResult.wait()
method until it ends successfully. So for example:
async_result = task.delay(params)
while True:
try:
async_result.wait()
break
except IOError:
pass
We use:
Celery is an open-source task queue software written in Python. It's incredibly lightweight, supports multiple brokers (RabbitMQ, Redis, and Amazon SQS), and also integrates with many web frameworks, e.g. Django, etc.
From my understanding, Celery is a distributed task queue, which means the only thing that it should do is dispatching tasks/jobs to others servers and get the result back. RabbitMQ is a message queue, and nothing more. However, a worker could just listen to the MQ and execute the task when a message is received.
celery-amqp-backend is a rewrite of the Celery's original amqp:// result backend, which was removed from Celery with version 5.0. Celery encourages you to use the newer rpc:// result backend, as it does not create a new result queue for each task and thus is faster in many circumstances.
You can certainly use pika to implement a distributed task queue if you want, especially if you have a fairly simple use-case. Celery is just providing a "batteries included" solution for task scheduling, management, etc. that you'll have to manually implement if you decide you want them with your pika solution.
I believe what you need to do is extend the timeout on the AWS ELB. What's happening is the connection is being closed before the task is complete. You can accomplish this by issuing the following command
elb-modify-lb-attributes myTestELB --connection-settings "idletimeout=3600" --headers
This would give you an hour to complete the task. See https://aws.amazon.com/blogs/aws/elb-idle-timeout-control/ for more info on this.
If an hour is not enough then you're going to have to disable connection pooling. Add these two settings to your celery config
BROKER_POOL_LIMIT = None
BROKER_TRANSPORT_OPTIONS = {'confirm_publish': True}
Second will have a performance hit since it adds some overhead. Since you have long running tasks this may not be an issue. The second setting may not be necessary but I would recommend it given that you're behind a load balancer. This setting will make sure messages are received and not lost in the process.
Another option is breaking your long task into smaller tasks too! This may mean more code but it may be worth it in the long run.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With