I've installed Celery and I'm trying to test it with the Celery First Steps Doc.
I tried using both Redis and RabbitMQ as brokers and backends, but I can't get the result with :
result.get(timeout = 10)
Each time, I get this error :
Traceback (most recent call last):
File "<input>", line 11, in <module>
File "/home/mehdi/.virtualenvs/python3/lib/python3.4/site-packages/celery/result.py", line 169, in get
no_ack=no_ack,
File "/home/mehdi/.virtualenvs/python3/lib/python3.4/site-packages/celery/backends/base.py", line 225, in wait_for
raise TimeoutError('The operation timed out.')
celery.exceptions.TimeoutError: The operation timed out.
The broker part seems to work just fine : when I run this code
from celery import Celery
app = Celery('tasks', backend='redis://localhost/', broker='amqp://')
@app.task
def add(x, y):
return x + y
result = add.delay(4,4)
I get (as expected)
[2015-08-04 12:05:44,910: INFO/MainProcess] Received task: tasks.add[741160b8-cb7b-4e63-93c3-f5e43f8f8a02]
[2015-08-04 12:05:44,911: INFO/MainProcess] Task tasks.add[741160b8-cb7b-4e63-93c3-f5e43f8f8a02] succeeded in 0.0004287530000510742s: 8
P.S : I'm using Xubuntu 64bit
EDIT :
My app.conf
{'CELERY_RESULT_DB_TABLENAMES': None,
'BROKER_TRANSPORT_OPTIONS': {},
'BROKER_USE_SSL': False,
'CELERY_BROADCAST_QUEUE': 'celeryctl',
'EMAIL_USE_TLS': False,
'CELERY_STORE_ERRORS_EVEN_IF_IGNORED': False,
'CELERY_CREATE_MISSING_QUEUES': True,
'CELERY_DEFAULT_QUEUE': 'celery',
'CELERY_SEND_TASK_SENT_EVENT': False,
'CELERYD_TASK_TIME_LIMIT': None,
'BROKER_URL': 'amqp://',
'CELERY_EVENT_QUEUE_EXPIRES': None,
'CELERY_DEFAULT_EXCHANGE_TYPE': 'direct',
'CELERYBEAT_SCHEDULER': 'celery.beat:PersistentScheduler',
'CELERY_MAX_CACHED_RESULTS': 100,
'CELERY_RESULT_PERSISTENT': None,
'CELERYD_POOL': 'prefork',
'CELERYD_AGENT': None,
'EMAIL_HOST': 'localhost',
'CELERY_CACHE_BACKEND_OPTIONS': {},
'BROKER_HEARTBEAT': None,
'CELERY_RESULT_ENGINE_OPTIONS': None,
'CELERY_RESULT_SERIALIZER': 'pickle',
'CELERYBEAT_SCHEDULE_FILENAME': 'celerybeat-schedule',
'CELERY_REDIRECT_STDOUTS_LEVEL': 'WARNING',
'CELERY_IMPORTS': (),
'SERVER_EMAIL': 'celery@localhost',
'CELERYD_TASK_LOG_FORMAT': '[%(asctime)s: %(levelname)s/%(processName)s] %(task_name)s[%(task_id)s]: %(message)s',
'CELERY_SECURITY_CERTIFICATE': None,
'CELERYD_LOG_COLOR': None,
'CELERY_RESULT_EXCHANGE': 'celeryresults',
'CELERY_TRACK_STARTED': False,
'CELERY_REDIS_PASSWORD': None,
'BROKER_USER': None,
'CELERY_COUCHBASE_BACKEND_SETTINGS': None,
'CELERY_RESULT_EXCHANGE_TYPE': 'direct',
'CELERY_REDIS_DB': None,
'CELERYD_TIMER_PRECISION': 1.0,
'CELERY_REDIS_PORT': None,
'BROKER_TRANSPORT': None,
'CELERYMON_LOG_FILE': None,
'CELERYD_CONCURRENCY': 0,
'CELERYD_HIJACK_ROOT_LOGGER': True,
'BROKER_VHOST': None,
'CELERY_DEFAULT_EXCHANGE': 'celery',
'CELERY_DEFAULT_ROUTING_KEY': 'celery',
'CELERY_ALWAYS_EAGER': False,
'EMAIL_TIMEOUT': 2,
'CELERYD_TASK_SOFT_TIME_LIMIT': None,
'CELERY_WORKER_DIRECT': False,
'CELERY_REDIS_HOST': None,
'CELERY_QUEUE_HA_POLICY': None,
'BROKER_PORT': None,
'CELERYD_AUTORELOADER': 'celery.worker.autoreload:Autoreloader',
'BROKER_CONNECTION_TIMEOUT': 4,
'CELERY_ENABLE_REMOTE_CONTROL': True,
'CELERY_RESULT_DB_SHORT_LIVED_SESSIONS': False,
'CELERY_EVENT_SERIALIZER': 'json',
'CASSANDRA_DETAILED_MODE': False,
'CELERY_REDIS_MAX_CONNECTIONS': None,
'CELERY_CACHE_BACKEND': None,
'CELERYD_PREFETCH_MULTIPLIER': 4,
'BROKER_PASSWORD': None,
'CELERY_BROADCAST_EXCHANGE_TYPE': 'fanout',
'CELERY_EAGER_PROPAGATES_EXCEPTIONS': False,
'CELERY_IGNORE_RESULT': False,
'CASSANDRA_KEYSPACE': None,
'EMAIL_HOST_PASSWORD': None,
'CELERYMON_LOG_LEVEL': 'INFO',
'CELERY_DISABLE_RATE_LIMITS': False,
'CELERY_TASK_PUBLISH_RETRY_POLICY': {'interval_start': 0,
'interval_max': 1,
'max_retries': 3,
'interval_step': 0.2},
'CELERY_SECURITY_KEY': None,
'CELERY_MONGODB_BACKEND_SETTINGS': None,
'CELERY_DEFAULT_RATE_LIMIT': None,
'CELERYBEAT_SYNC_EVERY': 0,
'CELERY_EVENT_QUEUE_TTL': None,
'CELERYD_POOL_PUTLOCKS': True,
'CELERY_TASK_SERIALIZER': 'pickle',
'CELERYD_WORKER_LOST_WAIT': 10.0,
'CASSANDRA_SERVERS': None,
'CELERYD_POOL_RESTARTS': False,
'CELERY_TASK_PUBLISH_RETRY': True,
'CELERY_ENABLE_UTC': True,
'CELERY_SEND_EVENTS': False,
'BROKER_CONNECTION_MAX_RETRIES': 100,
'CELERYD_LOG_FILE': None,
'CELERYD_FORCE_EXECV': False,
'CELERY_CHORD_PROPAGATES': True,
'CELERYD_AUTOSCALER': 'celery.worker.autoscale:Autoscaler',
'CELERYD_STATE_DB': None,
'CELERY_ROUTES': None,
'CELERYD_TIMER': None,
'ADMINS': (),
'BROKER_HEARTBEAT_CHECKRATE': 3.0,
'CELERY_ACCEPT_CONTENT': ['json',
'pickle',
'msgpack',
'yaml'],
'BROKER_LOGIN_METHOD': None,
'BROKER_CONNECTION_RETRY': True,
'CELERY_TIMEZONE': None,
'CASSANDRA_WRITE_CONSISTENCY': None,
'CELERYBEAT_MAX_LOOP_INTERVAL': 0,
'CELERYD_LOG_LEVEL': 'WARN',
'CELERY_REDIRECT_STDOUTS': True,
'BROKER_POOL_LIMIT': 10,
'CELERY_SECURITY_CERT_STORE': None,
'CELERYD_CONSUMER': 'celery.worker.consumer:Consumer',
'CELERY_INCLUDE': (),
'CELERYD_MAX_TASKS_PER_CHILD': None,
'CELERYD_LOG_FORMAT': '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s',
'CELERY_ANNOTATIONS': None,
'CELERY_MESSAGE_COMPRESSION': None,
'CASSANDRA_READ_CONSISTENCY': None,
'EMAIL_USE_SSL': False,
'CELERY_SEND_TASK_ERROR_EMAILS': False,
'CELERY_QUEUES': None,
'CELERY_ACKS_LATE': False,
'CELERYMON_LOG_FORMAT': '[%(asctime)s: %(levelname)s] %(message)s',
'CELERY_TASK_RESULT_EXPIRES': datetime.timedelta(1),
'BROKER_HOST': None,
'EMAIL_PORT': 25,
'BROKER_FAILOVER_STRATEGY': None,
'CELERY_RESULT_BACKEND': 'rpc://',
'CELERY_BROADCAST_EXCHANGE': 'celeryctl',
'CELERYBEAT_LOG_FILE': None,
'CELERYBEAT_SCHEDULE': {},
'CELERY_RESULT_DBURI': None,
'CELERY_DEFAULT_DELIVERY_MODE': 2,
'CELERYBEAT_LOG_LEVEL': 'INFO',
'CASSANDRA_COLUMN_FAMILY': None,
'EMAIL_HOST_USER': None}
After modifying of tasks it is necessary to restart celery to reread changes.
Finally worked with this project layout
proj/celery_proj/__init__.py
/celery.py
/tasks.py
/test.py
Where
celery.py
from __future__ import absolute_import
from celery import Celery
app = Celery('celery_proj',
broker='amqp://',
backend='amqp://',
include=['celery_proj.tasks'])
# Optional configuration, see the application user guide.
app.conf.update(
CELERY_TASK_RESULT_EXPIRES=3600,
)
if __name__ == '__main__':
app.start()
tasks.py
from __future__ import absolute_import
from celery_proj.celery import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
test.py
__author__ = 'mehdi'
path = '/home/mehdi/PycharmProjects'
import sys
sys.path.append(path)
from celery_proj.tasks import add
r = add.delay(4,4)
print(r.status)
print(r.result)
And launching the worker with :
cd proj
celery -A celery_proj worker -l info
An then running test.py :
python test.py
The 'backend' configuration can no longer be passed to the Celery object as an optional parameter, but should be passed through the python configuration parameter CELERY_RESULT_BACKEND (see https://github.com/celery/celery/issues/2146).
So the tasks.py (from the Celery tutorial) should look something like:
from celery import Celery
app = Celery('tasks', broker='amqp://guest@localhost//')
app.config_from_object('celeryconfig')
@app.task
def add(x, y):
print '[' + str(x) + '][' + str(y) + ']=' + str(x+y)
return x + y
Create a file celeryconfig.py in the same directory as tasks.py with the following content:
CELERY_RESULT_BACKEND='amqp://'
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With