I'm new to celery, and am working on running asynchronous tasks using Celery.
Celery project examples didn't help me much. Can anyone point me to some working examples?
celery beat is a scheduler. It kicks off tasks at regular intervals, which are then executed by the worker nodes available in the cluster. By default the entries are taken from the CELERYBEAT_SCHEDULE setting, but custom stores can also be used, like storing the entries in an SQL database.
To use MongoDB as your backend store you have to explicitly configure Celery to use MongoDB as the backend.
http://docs.celeryproject.org/en/latest/getting-started/brokers/mongodb.html#broker-mongodb
As you said the documentation does not show a complete working example. I just started playing with Celery but have been using MongoDB. I created a short working tutorial using MongoDB and Celery http://skillachie.com/?p=953
However these snippets should contain all you need to get a hello world going with Celery and MongoDB
celeryconfig.py
from celery.schedules import crontab
CELERY_RESULT_BACKEND = "mongodb"
CELERY_MONGODB_BACKEND_SETTINGS = {
"host": "127.0.0.1",
"port": 27017,
"database": "jobs",
"taskmeta_collection": "stock_taskmeta_collection",
}
#used to schedule tasks periodically and passing optional arguments
#Can be very useful. Celery does not seem to support scheduled task but only periodic
CELERYBEAT_SCHEDULE = {
'every-minute': {
'task': 'tasks.add',
'schedule': crontab(minute='*/1'),
'args': (1,2),
},
}
tasks.py
from celery import Celery
import time
#Specify mongodb host and datababse to connect to
BROKER_URL = 'mongodb://localhost:27017/jobs'
celery = Celery('EOD_TASKS',broker=BROKER_URL)
#Loads settings for Backend to store results of jobs
celery.config_from_object('celeryconfig')
@celery.task
def add(x, y):
time.sleep(30)
return x + y
I have been testing RabbitMQ as a broker and MongoDB as backend, and MongoDB as both broker and backend. These are my findings. I hope they help someone out there.
Assumption: You have MongoDB running on default settings(localhost:21017) Setting Environment using conda(you can use whatever package manager)
conda update -n base conda -c anaconda
conda create -n apps python=3.6 pymongo
conda install -n apps -c conda-forge celery
conda activate apps
Update my conda, create an environment called apps and install pymongo and celery.
RabbitMQ as a broker and MongoDB as backend
sudo apt install rabbitmq-server
sudo service rabbitmq-server restart
sudo rabbitmqctl status
If no errors then rabbitmq out to be running. Lets create tasks in executor.py and call them in runner.py
# executor.py
import time
from celery import Celery
BROKER_URL = 'amqp://localhost//'
BACKEND_URL = 'mongodb://localhost:27017/from_celery'
app = Celery('executor', broker=BROKER_URL, backend=BACKEND_URL)
@app.task
def pizza_bot(string:str, snooze=10):
'''return a dictionary with bot and
lower case string input
'''
print(f'Pretending to be working {snooze} seconds')
time.sleep(snooze)
return {'bot':string.lower()}
and we call them in runner.py
# runner.py
import time
from datetime import datetime
from executor import pizza_bot
def run_pizza(msg:str, use_celery:bool=True):
start_time = datetime.now()
if use_celery: # Using celery
response = pizza_bot.delay(msg)
else: # Not using celery
response = pizza_bot(msg)
print(f'It took {datetime.now()-start_time}!'
' to run')
print(f'response: {response}')
return response
if __name__ == '__main__':
# Call using celery
response = run_pizza('This finishes extra fast')
while not response.ready():
print(f'[Waiting] It is {response.ready()} that we have results')
time.sleep(2) # sleep to second
print('\n We got results:')
print(response.result)
Run celery on terminal A:
cd path_to_our_python_files
celery -A executor.app worker --loglevel=info
This is done in development only. I wanted to see what was happening in the background. In production, run it in daemonize.
Run runner.py on terminal B:
cd path_to_our_python_files
conda activate apps
python runner.py
In terminal A, you will see that the task is received and in snooze seconds it will be completed. On your MongoDB, you will see a new collection called from_celery, with the message and results.
MongoDB as both broker and backend
A simple modification was needed to set this. As mentioned, I had to create a config file to set MongoDB backend settings.
#mongo_config.py
#Backend Settings
CELERY_RESULT_BACKEND = "mongodb"
CELERY_MONGODB_BACKEND_SETTINGS = {
"host": "localhost",
"port": 27017,
"database": "celery",
"taskmeta_collection": "pizza_collection",
}
Let's create executor_updated.py which is pretty much the same as executor.py but the broker is now MongoDB and backend is added via config_from_object
# executor_updated.py
import time
from celery import Celery
BROKER_URL = 'mongodb://localhost:27017/celery'
app = Celery('executor_updated',broker=BROKER_URL)
#Load Backend Settings
app.config_from_object('mongo_config')
@app.task
def pizza_bot(string:str, snooze=10):
'''return a dictionary with bot and
lower case string input
'''
print(f'Pretending to be working {snooze} seconds')
time.sleep(snooze)
return {'bot':string.lower()}
Run celery on terminal C:
cd path_to_our_python_files
celery -A executor_updated.app worker --loglevel=info
Run runner.py on terminal D:
cd path_to_our_python_files
conda activate apps
python runner.py
Now we have both MongoDB as broker and backend. In MongoDB, you will see a collection called celery and a table pizza_collection
Hope this helps in getting you started with these awesome tools.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With