Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery creating a new connection for each task

Tags:

I'm using Celery with Redis to run some background tasks, but each time a task is called, it creates a new connection to Redis. I'm on Heroku and my Redis to Go plan allows for 10 connections. I'm quickly hitting that limit and getting a "max number of clients reached" error.

How can I ensure that Celery queues the tasks on a single connection rather than opening a new one each time?

EDIT - including the full traceback

File "/app/.heroku/venv/lib/python2.7/site-packages/django/core/handlers/base.py", line 111, in get_response    response = callback(request, *callback_args, **callback_kwargs)   File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/object_wrapper.py", line 166, in __call__    self._nr_instance, args, kwargs)   File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/hooks/framework_django.py", line 447, in wrapper    return wrapped(*args, **kwargs)   File "/app/.heroku/venv/lib/python2.7/site-packages/django/views/decorators/csrf.py", line 77, in wrapped_view    return view_func(*args, **kwargs)   File "/app/feedback/views.py", line 264, in zencoder_webhook_handler    tasks.process_zencoder_notification.delay(webhook)   File "/app/.heroku/venv/lib/python2.7/site-packages/celery/app/task.py", line 343, in delay    return self.apply_async(args, kwargs)   File "/app/.heroku/venv/lib/python2.7/site-packages/celery/app/task.py", line 458, in apply_async    with app.producer_or_acquire(producer) as P:   File "/usr/local/lib/python2.7/contextlib.py", line 17, in __enter__    return self.gen.next()   File "/app/.heroku/venv/lib/python2.7/site-packages/celery/app/base.py", line 247, in producer_or_acquire    with self.amqp.producer_pool.acquire(block=True) as producer:   File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py", line 705, in acquire    R = self.prepare(R)   File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/pools.py", line 54, in prepare    p = p()   File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/pools.py", line 45, in <lambda>    return lambda: self.create_producer()   File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/pools.py", line 42, in create_producer    return self.Producer(self._acquire_connection())   File "/app/.heroku/venv/lib/python2.7/site-packages/celery/app/amqp.py", line 160, in __init__    super(TaskProducer, self).__init__(channel, exchange, *args, **kwargs)   File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/messaging.py", line 83, in __init__    self.revive(self.channel)   File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/messaging.py", line 174, in revive    channel = self.channel = maybe_channel(channel)   File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py", line 879, in maybe_channel    return channel.default_channel   File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py", line 617, in default_channel    self.connection   File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py", line 610, in connection    self._connection = self._establish_connection()   File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py", line 569, in _establish_connection    conn = self.transport.establish_connection()   File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/transport/virtual/__init__.py", line 722, in establish_connection    self._avail_channels.append(self.create_channel(self))   File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/transport/virtual/__init__.py", line 705, in create_channel    channel = self.Channel(connection)   File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/transport/redis.py", line 271, in __init__    self.client.info()   File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/object_wrapper.py", line 166, in __call__    self._nr_instance, args, kwargs)   File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/function_trace.py", line 81, in literal_wrapper    return wrapped(*args, **kwargs)   File "/app/.heroku/venv/lib/python2.7/site-packages/redis/client.py", line 344, in info    return self.execute_command('INFO')   File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/transport/redis.py", line 536, in execute_command    conn.send_command(*args)   File "/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py", line 273, in send_command    self.send_packed_command(self.pack_command(*args))   File "/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py", line 256, in send_packed_command    self.connect()   File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/object_wrapper.py", line 166, in __call__    self._nr_instance, args, kwargs)   File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/function_trace.py", line 81, in literal_wrapper    return wrapped(*args, **kwargs)   File "/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py", line 207, in connect    self.on_connect()   File "/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py", line 233, in on_connect    if self.read_response() != 'OK':   File "/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py", line 283, in read_response    raise response  ResponseError: max number of clients reached 
like image 415
Mike Avatar asked Aug 17 '12 21:08

Mike


People also ask

Can a celery task call another task?

To answer your opening questions: As of version 2.0, Celery provides an easy way to start tasks from other tasks. What you are calling "secondary tasks" are what it calls "subtasks".

How does Celery execute tasks?

Introduction. Celery is a task queue/job queue based on asynchronous message passing. It can be used as a background task processor for your application in which you dump your tasks to execute in the background or at any given moment. It can be configured to execute your tasks synchronously or asynchronously.

How does celery task queue work?

Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task, the Celery client adds a message to the queue, and the broker then delivers that message to a worker. The most commonly used brokers are Redis and RabbitMQ.


1 Answers

I ran into the same problem on Heroku with CloudAMQP. I do not know why, but I had no luck when assigning low integers to the BROKER_POOL_LIMIT setting.

Ultimately, I found that by setting BROKER_POOL_LIMIT=None or BROKER_POOL_LIMIT=0 my issue was mitigated. According to the Celery docs, this disables the connection pool. So far, this has not been a noticeable issue for me, however I'm not sure if it might be for you.

Link to relevant info: http://celery.readthedocs.org/en/latest/configuration.html#broker-pool-limit

like image 190
jnishiyama Avatar answered Oct 09 '22 10:10

jnishiyama