I have a problem with every insert query (little query) which is executed in celery tasks asynchronously. In sync mode when i do insert all done great, but when it executed in apply_async() i get this:
OperationTimedOut('errors=errors=errors={}, last_host=***.***.*.***, last_host=None, last_host=None',)
Traceback:
Traceback (most recent call last):
File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
R = retval = fun(*args, **kwargs)
File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/celery/app/trace.py", line 437, in __protected_call__
return self.run(*args, **kwargs)
File "/var/nfs_www/***/www_v1/app/mods/news_feed/tasks.py", line 26, in send_new_comment_reply_notifications
send_new_comment_reply_notifications_method(comment_id)
File "/var/nfs_www/***www_v1/app/mods/news_feed/methods.py", line 83, in send_new_comment_reply_notifications
comment_type='comment_reply'
File "/var/nfs_www/***/www_v1/app/mods/news_feed/models/storage.py", line 129, in add
CommentsFeed(**kwargs).save()
File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cqlengine/models.py", line 531, in save
consistency=self.__consistency__).save()
File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cqlengine/query.py", line 907, in save
self._execute(insert)
File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cqlengine/query.py", line 786, in _execute
tmp = execute(q, consistency_level=self._consistency)
File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cqlengine/connection.py", line 95, in execute
result = session.execute(query, params)
File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cassandra/cluster.py", line 1103, in execute
result = future.result(timeout)
File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cassandra/cluster.py", line 2475, in result
raise OperationTimedOut(errors=self._errors, last_host=self._current_host)
OperationTimedOut: errors={}, last_host=***.***.*.***
Does anyone have ideas about problem?
I found this When cassandra-driver was executing the query, cassandra-driver returned error OperationTimedOut, but my query is very little and problem only in celery tasks.
UPDATE:
I made a test task and it raises this error too.
@celery.task()
def test_task_with_cassandra():
from app import cassandra_session
cassandra_session.execute('use news_feed')
return 'Done'
UPDATE 2: Made this:
@celery.task()
def test_task_with_cassandra():
from cqlengine import connection
connection.setup(app.config['CASSANDRA_SERVERS'], port=app.config['CASSANDRA_PORT'],
default_keyspace='test_keyspace')
from .models import Feed
Feed.objects.count()
return 'Done'
Got this:
NoHostAvailable('Unable to connect to any servers', {'***.***.*.***': OperationTimedOut('errors=errors=Timed out creating connection, last_host=None, last_host=None',)})
From shell i can connect to it
UPDATE 3: From deleted thread on github issue (found this in my emails): (this worked for me too) Here's how, in substance, I plug CQLengine to Celery:
from celery import Celery
from celery.signals import worker_process_init, beat_init
from cqlengine import connection
from cqlengine.connection import (
cluster as cql_cluster, session as cql_session)
def cassandra_init():
""" Initialize a clean Cassandra connection. """
if cql_cluster is not None:
cql_cluster.shutdown()
if cql_session is not None:
cql_session.shutdown()
connection.setup()
# Initialize worker context for both standard and periodic tasks.
worker_process_init.connect(cassandra_init)
beat_init.connect(cassandra_init)
app = Celery()
This is crude, but works. Should we add this snippet in the FAQ ?
I had a similar issue. It seemed to be related to sharing the Cassandra session between tasks. I solved it by creating a session per thread. Make sure you call get_session()
from you tasks and then do this:
thread_local = threading.local()
def get_session():
if hasattr(thread_local, "cassandra_session"):
return thread_local.cassandra_session
cluster = Cluster(settings.CASSANDRA_HOSTS)
session = cluster.connect(settings.CASSANDRA_KEYSPACE)
thread_local.cassandra_session = session
return session
Inspired by Ron's answer, I come up with the following code to put in tasks.py:
import threading
from django.conf import settings
from cassandra.cluster import Cluster
from celery.signals import worker_process_init,worker_process_shutdown
thread_local = threading.local()
@worker_process_init.connect
def open_cassandra_session(*args, **kwargs):
cluster = Cluster([settings.DATABASES["cassandra"]["HOST"],], protocol_version=3)
session = cluster.connect(settings.DATABASES["cassandra"]["NAME"])
thread_local.cassandra_session = session
@worker_process_shutdown.connect
def close_cassandra_session(*args,**kwargs):
session = thread_local.cassandra_session
session.shutdown()
thread_local.cassandra_session = None
This neat solution will automatically open/close cassandra sessions when celery worker process starts and stops.
Side note: protocol_version=3, because Cassandra 2.1 only supports protocol versions 3 and lower.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With