Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

dynamically add periodic tasks celery

Is it possible to dynamically add periodic tasks to celery?

I'm using Flask, not django, and I am building an app that is supposed to allow users to define recurrent tasks through a web interface.

I've tried using Periodic Tasks from Celery 4.1, but to add new tasks I have to stop celery server, change the config (even if done through python), and start it again. Maybe there's a way to dynamically load the config (without having to re-start it)?

I've considered to have a crontab that re-starts celery service every 5mins. but it seems highly contra-nature. Among other reasons because the reason I wanted to use celery was not to use crontab.

Does anyone has some lights on this?

ps.: I'm aware of another similar question, but it's from 2012. I was hoping things had changed since then, namely with the introduction of beat in v4.1

like image 331
ggaspar Avatar asked Dec 06 '17 10:12

ggaspar


People also ask

How do you run a periodic task on celery?

To create periodic tasks, we need to define them using the beat_scheduler setting. Celery beat checks the beat_scheduler setting to manage the tasks that need to be executed periodically. To the purpose of my example I use Redis as message broker. So the first step is to tell Celery who is his messages broker.

What is Shared_task in celery?

The "shared_task" decorator allows creation of Celery tasks for reusable apps as it doesn't need the instance of the Celery app. It is also easier way to define a task as you don't need to import the Celery app instance.

How many tasks can celery handle?

celery beats only trigger those 1000 tasks (by the crontab schedule), not run them. If you want to run 1000 tasks in parallel, you should have enough celery workers available to run those tasks.

What is Django celery beat?

Using django-celery-beatThis extension enables the user to store periodic tasks in a Django database and manage the tasks using the Django Admin interface. Use the following steps to install django-celery-beat in the simpletask project.


1 Answers

This works for Celery 4.0.1+ and Python 2.7, and Redis

from celery import Celery
import os, logging
logger = logging.getLogger(__name__)
current_module = __import__(__name__)

CELERY_CONFIG = {
    'CELERY_BROKER_URL': 
     'redis://{}/0'.format(os.environ.get('REDIS_URL', 'localhost:6379')),
  'CELERY_TASK_SERIALIZER': 'json',
}


celery = Celery(__name__, broker=CELERY_CONFIG['CELERY_BROKER_URL'])
celery.conf.update(CELERY_CONFIG)

I define a job in the following way:

job = {
    'task': 'my_function',               # Name of a predefined function
    'schedule': {'minute': 0, 'hour': 0} # crontab schedule
    'args': [2, 3],
    'kwargs': {}
}

I then define a decorator like this:

def add_to_module(f):
    setattr(current_module, 'tasks_{}__'.format(f.name), f)
    return f

My task is

@add_to_module
def my_function(x, y, **kwargs):
    return x + y

Then add a function which adds the task on the fly

def add_task(job):
    logger.info("Adding periodic job: %s", job)
    if not isinstance(job, dict) and 'task' in jobs:
        logger.error("Job {} is ill-formed".format(job))
        return False
    celery.add_periodic_task(
        crontab(**job.get('schedule', {'minute': 0, 'hour': 0})),
        get_from_module(job['task']).s(
            enterprise_id,
            *job.get('args', []),
            **job.get('kwargs', {})
        ),
        name = job.get('name'),
        expires = job.get('expires')
    )
    return True


def get_from_module(f):
    return getattr(current_module, 'tasks_{}__'.format(f))

After this, you can link the add_task function to a URL, and get them to create tasks out of functions in your current module

like image 115
ananth krishnan Avatar answered Sep 18 '22 15:09

ananth krishnan