I'm using Celery with Redis to run some background tasks, but each time a task is called, it creates a new connection to Redis. I'm on Heroku and my Redis to Go plan allows for 10 connections. I'm quickly hitting that limit and getting a "max number of clients reached" error.
How can I ensure that Celery queues the tasks on a single connection rather than opening a new one each time?
EDIT - including the full traceback
File "/app/.heroku/venv/lib/python2.7/site-packages/django/core/handlers/base.py", line 111, in get_response response = callback(request, *callback_args, **callback_kwargs) File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/object_wrapper.py", line 166, in __call__ self._nr_instance, args, kwargs) File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/hooks/framework_django.py", line 447, in wrapper return wrapped(*args, **kwargs) File "/app/.heroku/venv/lib/python2.7/site-packages/django/views/decorators/csrf.py", line 77, in wrapped_view return view_func(*args, **kwargs) File "/app/feedback/views.py", line 264, in zencoder_webhook_handler tasks.process_zencoder_notification.delay(webhook) File "/app/.heroku/venv/lib/python2.7/site-packages/celery/app/task.py", line 343, in delay return self.apply_async(args, kwargs) File "/app/.heroku/venv/lib/python2.7/site-packages/celery/app/task.py", line 458, in apply_async with app.producer_or_acquire(producer) as P: File "/usr/local/lib/python2.7/contextlib.py", line 17, in __enter__ return self.gen.next() File "/app/.heroku/venv/lib/python2.7/site-packages/celery/app/base.py", line 247, in producer_or_acquire with self.amqp.producer_pool.acquire(block=True) as producer: File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py", line 705, in acquire R = self.prepare(R) File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/pools.py", line 54, in prepare p = p() File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/pools.py", line 45, in <lambda> return lambda: self.create_producer() File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/pools.py", line 42, in create_producer return self.Producer(self._acquire_connection()) File "/app/.heroku/venv/lib/python2.7/site-packages/celery/app/amqp.py", line 160, in __init__ super(TaskProducer, self).__init__(channel, exchange, *args, **kwargs) File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/messaging.py", line 83, in __init__ self.revive(self.channel) File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/messaging.py", line 174, in revive channel = self.channel = maybe_channel(channel) File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py", line 879, in maybe_channel return channel.default_channel File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py", line 617, in default_channel self.connection File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py", line 610, in connection self._connection = self._establish_connection() File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py", line 569, in _establish_connection conn = self.transport.establish_connection() File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/transport/virtual/__init__.py", line 722, in establish_connection self._avail_channels.append(self.create_channel(self)) File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/transport/virtual/__init__.py", line 705, in create_channel channel = self.Channel(connection) File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/transport/redis.py", line 271, in __init__ self.client.info() File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/object_wrapper.py", line 166, in __call__ self._nr_instance, args, kwargs) File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/function_trace.py", line 81, in literal_wrapper return wrapped(*args, **kwargs) File "/app/.heroku/venv/lib/python2.7/site-packages/redis/client.py", line 344, in info return self.execute_command('INFO') File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/transport/redis.py", line 536, in execute_command conn.send_command(*args) File "/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py", line 273, in send_command self.send_packed_command(self.pack_command(*args)) File "/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py", line 256, in send_packed_command self.connect() File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/object_wrapper.py", line 166, in __call__ self._nr_instance, args, kwargs) File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/function_trace.py", line 81, in literal_wrapper return wrapped(*args, **kwargs) File "/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py", line 207, in connect self.on_connect() File "/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py", line 233, in on_connect if self.read_response() != 'OK': File "/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py", line 283, in read_response raise response ResponseError: max number of clients reached
To answer your opening questions: As of version 2.0, Celery provides an easy way to start tasks from other tasks. What you are calling "secondary tasks" are what it calls "subtasks".
Introduction. Celery is a task queue/job queue based on asynchronous message passing. It can be used as a background task processor for your application in which you dump your tasks to execute in the background or at any given moment. It can be configured to execute your tasks synchronously or asynchronously.
Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task, the Celery client adds a message to the queue, and the broker then delivers that message to a worker. The most commonly used brokers are Redis and RabbitMQ.
I ran into the same problem on Heroku with CloudAMQP. I do not know why, but I had no luck when assigning low integers to the BROKER_POOL_LIMIT
setting.
Ultimately, I found that by setting BROKER_POOL_LIMIT=None
or BROKER_POOL_LIMIT=0
my issue was mitigated. According to the Celery docs, this disables the connection pool. So far, this has not been a noticeable issue for me, however I'm not sure if it might be for you.
Link to relevant info: http://celery.readthedocs.org/en/latest/configuration.html#broker-pool-limit
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With