I am getting this when using a Scrapy parsing function (that can take till 10 minutes sometimes) inside a Celery task.
I use: - Django==1.6.5 - django-celery==3.1.16 - celery==3.1.16 - psycopg2==2.5.5 (I used also psycopg2==2.5.4)
[2015-07-19 11:27:49,488: CRITICAL/MainProcess] Task myapp.parse_items[63fc40eb-c0d6-46f4-a64e-acce8301d29a] INTERNAL ERROR: InterfaceError('connection already closed',) Traceback (most recent call last): File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/celery/app/trace.py", line 284, in trace_task uuid, retval, SUCCESS, request=task_request, File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/celery/backends/base.py", line 248, in store_result request=request, **kwargs) File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/djcelery/backends/database.py", line 29, in _store_result traceback=traceback, children=self.current_task_children(request), File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/djcelery/managers.py", line 42, in _inner return fun(*args, **kwargs) File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/djcelery/managers.py", line 181, in store_result 'meta': {'children': children}}) File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/djcelery/managers.py", line 87, in update_or_create return get_queryset(self).update_or_create(**kwargs) File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/djcelery/managers.py", line 70, in update_or_create obj, created = self.get_or_create(**kwargs) File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/query.py", line 376, in get_or_create return self.get(**lookup), False File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/query.py", line 304, in get num = len(clone) File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/query.py", line 77, in __len__ self._fetch_all() File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/query.py", line 857, in _fetch_all self._result_cache = list(self.iterator()) File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/query.py", line 220, in iterator for row in compiler.results_iter(): File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/sql/compiler.py", line 713, in results_iter for rows in self.execute_sql(MULTI): File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/sql/compiler.py", line 785, in execute_sql cursor = self.connection.cursor() File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/backends/__init__.py", line 160, in cursor cursor = self.make_debug_cursor(self._cursor()) File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/backends/__init__.py", line 134, in _cursor return self.create_cursor() File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/utils.py", line 99, in __exit__ six.reraise(dj_exc_type, dj_exc_value, traceback) File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/backends/__init__.py", line 134, in _cursor return self.create_cursor() File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/backends/postgresql_psycopg2/base.py", line 137, in create_cursor cursor = self.connection.cursor() InterfaceError: connection already closed
Unfortunately this is a problem with django + psycopg2 + celery combo. It's an old and unsolved problem.
Take a look on this thread to understand: https://github.com/celery/django-celery/issues/121
Basically, when celery starts a worker, it forks a database connection from django.db framework. If this connection drops for some reason, it doesn't create a new one. Celery has nothing to do with this problem once there is no way to detect when the database connection is dropped using django.db libraries. Django doesn't notifies when it happens, because it just start a connection and it receives a wsgi call (no connection pool). I had the same problem on a huge production environment with a lot of machine workers, and sometimes, these machines lost connectivity with postgres server.
I solved it putting each celery master process under a linux supervisord handler and a watcher and implemented a decorator that handles the psycopg2.InterfaceError, and when it happens this function dispatches a syscall to force supervisor restart gracefully with SIGINT the celery process.
Edit:
Found a better solution. I implemented a celery task baseclass like this:
from django.db import connection import celery class FaultTolerantTask(celery.Task): """ Implements after return hook to close the invalid connection. This way, django is forced to serve a new connection for the next task. """ abstract = True def after_return(self, *args, **kwargs): connection.close() @celery.task(base=FaultTolerantTask) def my_task(): # my database dependent code here
I believe it will fix your problem too.
Guys and emanuelcds,
I had the same problem, now I have updated my code and created a new loader for celery:
from djcelery.loaders import DjangoLoader from django import db class CustomDjangoLoader(DjangoLoader): def on_task_init(self, task_id, task): """Called before every task.""" for conn in db.connections.all(): conn.close_if_unusable_or_obsolete() super(CustomDjangoLoader, self).on_task_init(task_id, task)
This of course if you are using djcelery, it will also require something like this in the settings:
CELERY_LOADER = 'myproject.loaders.CustomDjangoLoader' os.environ['CELERY_LOADER'] = CELERY_LOADER
I still have to test it, I will update.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With