After running through the basic example for flask-celery(runs fine as far as I can tell) I'm trying to integrate this to my own project. Basically, I'm using this below:
from flask import Blueprint, jsonify, request, session
from flask.views import MethodView
from celery.decorators import task
blueprint = Blueprint('myapi', __name__)
class MyAPI(MethodView):
def get(self, tag):
return get_resource.apply_async(tag)
@task(name="get_task")
def get_resource(tag):
pass
with the same setup as in the example, I'm getting this error:
Traceback (most recent call last):
File "/x/venv/lib/python2.7/site-packages/flask/app.py", line 1518, in __call__
return self.wsgi_app(environ, start_response)
File "/x/venv/lib/python2.7/site-packages/flask/app.py", line 1506, in wsgi_app
response = self.make_response(self.handle_exception(e))
File "/x/venv/lib/python2.7/site-packages/flask/app.py", line 1504, in wsgi_app
response = self.full_dispatch_request()
File "/x/venv/lib/python2.7/site-packages/flask/app.py", line 1264, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/x/venv/lib/python2.7/site-packages/flask/app.py", line 1262, in full_dispatch_request
rv = self.dispatch_request()
File "/x/venv/lib/python2.7/site-packages/flask/app.py", line 1248, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/x/venv/lib/python2.7/site-packages/flask/views.py", line 84, in view
return self.dispatch_request(*args, **kwargs)
File "/x/venv/lib/python2.7/site-packages/flask/views.py", line 151, in dispatch_request
return meth(*args, **kwargs)
File "/x/api/modules/document/document.py", line 14, in get
return get_resource.apply_async(tag)
File "/x/venv/lib/python2.7/site-packages/celery/app/task/__init__.py", line 449, in apply_async
publish = publisher or self.app.amqp.publisher_pool.acquire(block=True)
File "/x/venv/lib/python2.7/site-packages/kombu/connection.py", line 657, in acquire
R = self.prepare(R)
File "/x/venv/lib/python2.7/site-packages/kombu/pools.py", line 54, in prepare
p = p()
File "/x/venv/lib/python2.7/site-packages/kombu/pools.py", line 45, in <lambda>
return lambda: self.create_producer()
File "/x/venv/lib/python2.7/site-packages/celery/app/amqp.py", line 265, in create_producer
pub = self.app.amqp.TaskPublisher(conn, auto_declare=False)
File "/x/venv/lib/python2.7/site-packages/celery/app/amqp.py", line 328, in TaskPublisher
return TaskPublisher(*args, **self.app.merge(defaults, kwargs))
File "/x/venv/lib/python2.7/site-packages/celery/app/amqp.py", line 158, in __init__
super(TaskPublisher, self).__init__(*args, **kwargs)
File "/x/venv/lib/python2.7/site-packages/kombu/compat.py", line 61, in __init__
super(Publisher, self).__init__(connection, self.exchange, **kwargs)
File "/x/venv/lib/python2.7/site-packages/kombu/messaging.py", line 79, in __init__
self.revive(self.channel)
File "/x/venv/lib/python2.7/site-packages/kombu/messaging.py", line 168, in revive
channel = channel.default_channel
File "/x/venv/lib/python2.7/site-packages/kombu/connection.py", line 581, in default_channel
self.connection
File "/x/venv/lib/python2.7/site-packages/kombu/connection.py", line 574, in connection
self._connection = self._establish_connection()
File "/x/venv/lib/python2.7/site-packages/kombu/connection.py", line 533, in _establish_connection
conn = self.transport.establish_connection()
File "/x/venv/lib/python2.7/site-packages/kombu/transport/amqplib.py", line 279, in establish_connection
connect_timeout=conninfo.connect_timeout)
File "/x/venv/lib/python2.7/site-packages/kombu/transport/amqplib.py", line 89, in __init__
super(Connection, self).__init__(*args, **kwargs)
File "/x/venv/lib/python2.7/site-packages/amqplib/client_0_8/connection.py", line 129, in __init__
self.transport = create_transport(host, connect_timeout, ssl)
File "/x/venv/lib/python2.7/site-packages/amqplib/client_0_8/transport.py", line 281, in create_transport
return TCPTransport(host, connect_timeout)
File "/x/venv/lib/python2.7/site-packages/amqplib/client_0_8/transport.py", line 85, in __init__
raise socket.error, msg
error: [Errno 111] Connection refused
-->
I'm using redis, and if I install rabbitmq I get another error, but I do not understand this right now --the broker should be redis but its isn't finding it or what? Can anyone give me more of a clue what is going on here? Do I need to import something else, etc. The point is, there is very little beyond the bare bones example and this makes no sense to me.
The most I've been able to determine as that in the Api module there is no access to the 'celery' and when it goes to try and put data there when at the app level, the celery there falls into some defaults, which aren't installed because I'm pointing to redis. Just a guess. I haven't been able to import information into the module, only determined that calling anything 'celery'(for example, output celery.conf) from the app causes an error -- although I could import celery.task.
This is the broker config the application is using, direct from the example:
BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = "redis"
CELERY_REDIS_HOST = "localhost"
CELERY_REDIS_PORT = 6379
CELERY_REDIS_DB = 0
EDIT:
If you'd like to see a demo: https://github.com/thrisp/flask-celery-example
AS it turns out having BROKER_TRANSPORT = 'redis' in your settings is important for whatever is being passed that I'm passing in (for the setup I put forth here and in the git example), I'm not entirely sure why it isn't in the example bits, but is in the ones I've added but it is -- without this it wants to dump everything onto a default ampq queue.
EDIT2:
Also, this a rather a big deal, using the upcoming version of Celery simplifies 10,000 issues when using it with Flask, making all of this unecesssary.
You must configure redis to bind the localhost. In /etc/redis/redis.conf
, uncomment the line with
bind 127.0.0.1
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With