Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Working example of celery with mongo DB

Tags:

mongodb

celery

I'm new to celery, and am working on running asynchronous tasks using Celery.

  1. I want to save the results of my tasks to MongoDB.
  2. I want to use the AMQP broker.

Celery project examples didn't help me much. Can anyone point me to some working examples?

like image 338
sandeep reddy Avatar asked Apr 01 '13 09:04

sandeep reddy


People also ask

How does celery beat?

celery beat is a scheduler. It kicks off tasks at regular intervals, which are then executed by the worker nodes available in the cluster. By default the entries are taken from the CELERYBEAT_SCHEDULE setting, but custom stores can also be used, like storing the entries in an SQL database.


2 Answers

To use MongoDB as your backend store you have to explicitly configure Celery to use MongoDB as the backend.

http://docs.celeryproject.org/en/latest/getting-started/brokers/mongodb.html#broker-mongodb

As you said the documentation does not show a complete working example. I just started playing with Celery but have been using MongoDB. I created a short working tutorial using MongoDB and Celery http://skillachie.com/?p=953

However these snippets should contain all you need to get a hello world going with Celery and MongoDB

celeryconfig.py

 from celery.schedules import crontab

CELERY_RESULT_BACKEND = "mongodb"
CELERY_MONGODB_BACKEND_SETTINGS = {
    "host": "127.0.0.1",
    "port": 27017,
    "database": "jobs", 
    "taskmeta_collection": "stock_taskmeta_collection",
}

#used to schedule tasks periodically and passing optional arguments 
#Can be very useful. Celery does not seem to support scheduled task but only periodic
CELERYBEAT_SCHEDULE = {
    'every-minute': {
        'task': 'tasks.add',
        'schedule': crontab(minute='*/1'),
        'args': (1,2),
    },
}

tasks.py

from celery import Celery
import time 

#Specify mongodb host and datababse to connect to
BROKER_URL = 'mongodb://localhost:27017/jobs'

celery = Celery('EOD_TASKS',broker=BROKER_URL)

#Loads settings for Backend to store results of jobs 
celery.config_from_object('celeryconfig')

@celery.task
def add(x, y):
    time.sleep(30)
    return x + y
like image 98
Skillachie Avatar answered Oct 18 '22 14:10

Skillachie


I have been testing RabbitMQ as a broker and MongoDB as backend, and MongoDB as both broker and backend. These are my findings. I hope they help someone out there.

Assumption: You have MongoDB running on default settings(localhost:21017) Setting Environment using conda(you can use whatever package manager)

conda update -n base conda -c anaconda
conda create -n apps python=3.6 pymongo
conda install -n apps -c conda-forge celery
conda activate apps

Update my conda, create an environment called apps and install pymongo and celery.

RabbitMQ as a broker and MongoDB as backend

sudo apt install rabbitmq-server
sudo service rabbitmq-server restart 
sudo rabbitmqctl status

If no errors then rabbitmq out to be running. Lets create tasks in executor.py and call them in runner.py

# executor.py
import time
from celery import Celery

BROKER_URL = 'amqp://localhost//'
BACKEND_URL = 'mongodb://localhost:27017/from_celery'
app = Celery('executor', broker=BROKER_URL, backend=BACKEND_URL)

@app.task
def pizza_bot(string:str, snooze=10):
    '''return a dictionary with bot and
    lower case string input
    '''
    print(f'Pretending to be working {snooze} seconds')
    time.sleep(snooze)
    return {'bot':string.lower()}

and we call them in runner.py

# runner.py
import time
from datetime import datetime

from executor import pizza_bot


def run_pizza(msg:str, use_celery:bool=True):
      
      
      start_time = datetime.now()
      if use_celery: # Using celery
            response = pizza_bot.delay(msg)
      else: # Not using celery
            response = pizza_bot(msg)

      print(f'It took {datetime.now()-start_time}!'
            ' to run')
      print(f'response: {response}')

      return response

if __name__ == '__main__':
   
      # Call using celery
      response = run_pizza('This finishes extra fast')
      while not response.ready():
            print(f'[Waiting] It is {response.ready()} that we have results')
            time.sleep(2) # sleep to second
      print('\n We got results:')
      print(response.result)

Run celery on terminal A:

cd path_to_our_python_files
celery -A executor.app worker --loglevel=info

This is done in development only. I wanted to see what was happening in the background. In production, run it in daemonize.

Run runner.py on terminal B:

cd path_to_our_python_files
conda activate apps
python runner.py

In terminal A, you will see that the task is received and in snooze seconds it will be completed. On your MongoDB, you will see a new collection called from_celery, with the message and results.

MongoDB as both broker and backend

A simple modification was needed to set this. As mentioned, I had to create a config file to set MongoDB backend settings.

#mongo_config.py
#Backend Settings
CELERY_RESULT_BACKEND = "mongodb"
CELERY_MONGODB_BACKEND_SETTINGS = {
    "host": "localhost",
    "port": 27017,
    "database": "celery", 
    "taskmeta_collection": "pizza_collection",
}

Let's create executor_updated.py which is pretty much the same as executor.py but the broker is now MongoDB and backend is added via config_from_object

# executor_updated.py
import time
from celery import Celery

BROKER_URL = 'mongodb://localhost:27017/celery'
app = Celery('executor_updated',broker=BROKER_URL)

#Load Backend Settings
app.config_from_object('mongo_config')

@app.task
def pizza_bot(string:str, snooze=10):
    '''return a dictionary with bot and
    lower case string input
    '''
    print(f'Pretending to be working {snooze} seconds')
    time.sleep(snooze)
    return {'bot':string.lower()}

Run celery on terminal C:

cd path_to_our_python_files
celery -A executor_updated.app worker --loglevel=info

Run runner.py on terminal D:

cd path_to_our_python_files
conda activate apps
python runner.py

Now we have both MongoDB as broker and backend. In MongoDB, you will see a collection called celery and a table pizza_collection

Hope this helps in getting you started with these awesome tools.

like image 20
Prayson W. Daniel Avatar answered Oct 18 '22 12:10

Prayson W. Daniel