Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to dynamically add / remove periodic tasks to Celery (celerybeat)

If I have a function defined as follows:

def add(x,y):   return x+y 

Is there a way to dynamically add this function as a celery PeriodicTask and kick it off at runtime? I'd like to be able to do something like (pseudocode):

some_unique_task_id = celery.beat.schedule_task(add, run_every=crontab(minute="*/30")) celery.beat.start(some_unique_task_id) 

I would also want to stop or remove that task dynamically with something like (pseudocode):

celery.beat.remove_task(some_unique_task_id) 

or

celery.beat.stop(some_unique_task_id) 

FYI I am not using djcelery, which lets you manage periodic tasks via the django admin.

like image 403
Jamie Forrest Avatar asked Apr 17 '12 16:04

Jamie Forrest


People also ask

How do I import a periodic task into celery?

schedules import crontab from celery. decorators import periodic_task @periodic_task(run_every=crontab(hour=7, minute=30, day_of_week=1)) def every_monday_morning(): print("Execute every Monday at 7:30AM.") Execute every minute. Execute daily at midnight.

How does celery beat?

celery beat is a scheduler; It kicks off tasks at regular intervals, that are then executed by available worker nodes in the cluster. By default the entries are taken from the beat_schedule setting, but custom stores can also be used, like storing the entries in a SQL database.

What is periodic task in Django?

About. This extension enables you to store the periodic task schedule in the database. The periodic tasks can be managed from the Django Admin interface, where you can create, edit and delete periodic tasks and how often they should run.

What is celery task in Django?

Celery is a task queue/job queue based on distributed message passing. It is focused on real-time operation, but supports scheduling as well. The execution units, called tasks, are executed concurrently on a single or more worker servers.


2 Answers

This question was answered on google groups.

I AM NOT THE AUTHOR, all credit goes to Jean Mark

Here's a proper solution for this. Confirmed working, In my scenario, I sub-classed Periodic Task and created a model out of it since I can add other fields to the model as I need and also so I could add the "terminate" method. You have to set the periodic task's enabled property to False and save it before you delete it. The whole subclassing is not a must, the schedule_every method is the one that really does the work. When you're ready to terminate you task (if you didn't subclass it) you can just use PeriodicTask.objects.filter(name=...) to search for your task, disable it, then delete it.

Hope this helps!

from djcelery.models import PeriodicTask, IntervalSchedule from datetime import datetime  class TaskScheduler(models.Model):      periodic_task = models.ForeignKey(PeriodicTask)      @staticmethod     def schedule_every(task_name, period, every, args=None, kwargs=None):     """ schedules a task by name every "every" "period". So an example call would be:          TaskScheduler('mycustomtask', 'seconds', 30, [1,2,3])           that would schedule your custom task to run every 30 seconds with the arguments 1,2 and 3 passed to the actual task.      """         permissible_periods = ['days', 'hours', 'minutes', 'seconds']         if period not in permissible_periods:             raise Exception('Invalid period specified')         # create the periodic task and the interval         ptask_name = "%s_%s" % (task_name, datetime.datetime.now()) # create some name for the period task         interval_schedules = IntervalSchedule.objects.filter(period=period, every=every)         if interval_schedules: # just check if interval schedules exist like that already and reuse em             interval_schedule = interval_schedules[0]         else: # create a brand new interval schedule             interval_schedule = IntervalSchedule()             interval_schedule.every = every # should check to make sure this is a positive int             interval_schedule.period = period              interval_schedule.save()         ptask = PeriodicTask(name=ptask_name, task=task_name, interval=interval_schedule)         if args:             ptask.args = args         if kwargs:             ptask.kwargs = kwargs         ptask.save()         return TaskScheduler.objects.create(periodic_task=ptask)      def stop(self):         """pauses the task"""         ptask = self.periodic_task         ptask.enabled = False         ptask.save()      def start(self):         """starts the task"""         ptask = self.periodic_task         ptask.enabled = True         ptask.save()      def terminate(self):         self.stop()         ptask = self.periodic_task         self.delete()         ptask.delete() 
like image 121
McP Avatar answered Sep 30 '22 09:09

McP


This was finally made possible by a fix included in celery v4.1.0. Now, you just need to change the schedule entries in the database backend, and celery-beat will act according to the new schedule.

The docs vaguely describe how this works. The default scheduler for celery-beat, PersistentScheduler, uses a shelve file as its schedule database. Any changes to the beat_schedule dictionary in the PersistentScheduler instance are synced with this database (by default, every 3 minutes), and vice-versa. The docs describe how to add new entries to the beat_schedule using app.add_periodic_task. To modify an existing entry, just add a new entry with the same name. Delete an entry as you would from a dictionary: del app.conf.beat_schedule['name'].

Suppose you want to monitor and modify your celery beat schedule using an external app. Then you have several options:

  1. You can open the shelve database file and read its contents like a dictionary. Write back to this file for modifications.
  2. You can run another instance of the Celery app, and use that one to modify the shelve file as described above.
  3. You can use the custom scheduler class from django-celery-beat to store the schedule in a django-managed database, and access the entries there.
  4. You can use the scheduler from celerybeat-mongo to store the schedule in a MongoDB backend, and access the entries there.
like image 38
Tristan Brown Avatar answered Sep 30 '22 07:09

Tristan Brown