I configured my project, referring to this answer: How to use Flask-SQLAlchemy in a Celery task
My extension.py
file:
import flask
from flask.ext.sqlalchemy import SQLAlchemy
from config import BaseConfig
from celery import Celery
from flask_mail import Mail
from celery.schedules import crontab
class FlaskCelery(Celery):
def __init__(self, *args, **kwargs):
super(FlaskCelery, self).__init__(*args, **kwargs)
self.patch_task()
if 'app' in kwargs:
self.init_app(kwargs['app'])
def patch_task(self):
TaskBase = self.Task
_celery = self
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
if flask.has_app_context():
return TaskBase.__call__(self, *args, **kwargs)
else:
with _celery.app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
self.Task = ContextTask
def init_app(self, app):
self.app = app
self.config_from_object(app.config)
mail = Mail()
db = SQLAlchemy()
settings = BaseConfig()
celery = FlaskCelery()
Then in my app_settings.py
, I created this app:
app = Flask('app', instance_relative_config=True)
And configured celery:
celery.init_app(app)
I ran the flask project with python manage.py run
:
app.run(
debug=settings.get('DEBUG', False),
host=settings.get('HOST', '127.0.0.1'),
port=settings.get('PORT', 5000)
)
And ran celery:
celery -A manage.celery worker --beat -l debug
Celery log looks good:
[tasks]
. app.api.tasks.spin_file
. app.main.tasks.send_async_email
. celery.backend_cleanup
. celery.chain
...
Then in views.py
, I call this task:
send_async_email.delay(*args, **kwargs)
But all tasks are being ignored by Celery. Nothing happens, no errors, no warnings. Nothing. What am I doing wrong?
EDIT: When I start celery with this command: celery -A manage.celery worker --beat -l debug
I get the following warning:
[2015-09-21 10:04:32,220: WARNING/MainProcess] /home/.virtualenvs/myproject/local/lib/python2.7/site-packages/celery/app/control.py:36: DuplicateNodenameWarning: Received multiple replies from node name: 'name'.
Please make sure you give each node a unique nodename using the `-n` option.
pluralize(len(dupes), 'name'), ', '.join(sorted(dupes)),
Process of Task Execution by Celery can be broken down into:Your application sends the tasks to the task broker, it is then reserved by a worker for execution & finally the result of task execution is stored in the result backend.
The "shared_task" decorator allows creation of Celery tasks for reusable apps as it doesn't need the instance of the Celery app. It is also easier way to define a task as you don't need to import the Celery app instance.
I am not sure that this will help you but I am using this code on many of my projects when ever I need celery:
from flask import Flask, request, jsonify as jsn
from celery import Celery
app = Flask(__name__)
app.config.update(dict(
SECRET_KEY='blabla'
)
)
# Celery configuration
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'database'
app.config['CELERY_RESULT_DBURI'] = 'sqlite:///temp.db'
app.config['CELERY_TRACK_STARTED'] = True
app.config['CELERY_SEND_EVENTS'] = True
# Initialize Celery
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task
def do_something(data):
from celery import current_task
import os
import subprocess
with app.app_context():
#run some bash script with some params in my case
And then I am running celery with supervisor via:
#!/bin/bash
cd /project/location && . venv/bin/activate && celery worker -A appname.celery --loglevel=info --purge #appname is my main flask file
And of course in my route i have somthing like
@app.route('/someroute', methods=["POST"])
def someroute():
result = do_something.delay(data)
print result.id
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With