Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Django Celery: Execute only one instance of a long-running process

I have a long-running process that must run every five minutes, but more than one instance of the processes should never run at the same time. The process should not normally run past five min, but I want to be sure that a second instance does not start up if it runs over.

Per a previous recommendation, I'm using Django Celery to schedule this long-running task.

I don't think a periodic task will work, because if I have a five minute period, I don't want a second task to execute if another instance of the task is running already.

My current experiment is as follows: at 8:55, an instance of the task starts to run. When the task is finishing up, it will trigger another instance of itself to run at the next five min mark. So if the first task finished at 8:57, the second task would run at 9:00. If the first task happens to run long and finish at 9:01, it would schedule the next instance to run at 9:05.

I've been struggling with a variety of cryptic errors when doing anything more than the simple example below and I haven't found any other examples of people scheduling tasks from a previous instance of itself. I'm wondering if there is maybe a better approach to doing what I am trying to do. I know there's a way to name one's tasks; perhaps there's a way to search for running or scheduled instances with the same name? Does anyone have any advice to offer regarding running a task every five min, but ensuring that only one task runs at a time?

Thank you, Joe

In mymodule/tasks.py:

import datetime
from celery.decorators import task 

@task 
def test(run_periodically, frequency):

    run_long_process()
    now = datetime.datetime.now()
    # Run this task every x minutes, where x is an integer specified by frequency
    eta = (
      now - datetime.timedelta(
         minutes =  now.minute % frequency , seconds = now.second, 
         microseconds = now.microsecond ) ) + datetime.timedelta(minutes=frequency) 
    task = test.apply_async(args=[run_periodically, frequency,], eta=eta)  

From a ./manage.py shell:

from mymodule import tasks
result = tasks.test.apply_async(args=[True, 5])
like image 473
Joe J Avatar asked Jan 17 '12 23:01

Joe J


2 Answers

You can use periodic tasks paired with a special lock which ensures the tasks are executed one at a time. Here is a sample implementation from Celery documentation:

http://ask.github.com/celery/cookbook/tasks.html#ensuring-a-task-is-only-executed-one-at-a-time

Your described method with scheduling task from the previous execution can stop the execution of tasks if there will be failure in one of them.

like image 34
mher Avatar answered Sep 27 '22 16:09

mher


I personally solve this issue by caching a flag by a key like task.name + args

def perevent_run_duplicate(func):
    """
        this decorator set a flag to cache for a task with specifig args
        and wait to completion, if during this task received another call
        with same cache key will ignore to avoid of conflicts.
        and then after task finished will delete the key from cache

        - cache keys with a day of timeout

    """

    @wraps(func)
    def outer(self, *args, **kwargs):
        if cache.get(f"running_task_{self.name}_{args}", False):
            return
        else:
            cache.set(f"running_task_{self.name}_{args}", True, 24 * 60 * 60)
        try:
            func(self, *args, **kwargs)
        finally:
            cache.delete(f"running_task_{self.name}_{args}")

    return outer 

this decorator will manage task calls to prevent duplicate calls for a task by same args.

like image 52
mujad Avatar answered Sep 27 '22 16:09

mujad