Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Django Celery Workflow Chain Pause/Resume

Is there any way to pause/resume a running workflow created using chains from celery 3.0?

Basically, we have two different types of tasks in our system: interactive and non-interactive ones. The non-interactive ones we have all the parameters for, but the interactive ones need user input. Note that for the interactive tasks, we can only ask for user input once all the previous taks in the chain have been completed, as their results will affect the interactive tasks (i.e. we cannot ask for user input before creating the actual chain).

Any suggestion on how to approach this? Really at a loss here..

Current ideas:

  • Create two subclasses of Task (from celery import Task). Add an extra instance (class member) variable to the Interactive task subclass that is set to false by default and represents that some user input is still needed. Somehow have access to the instance of the Task, and set it to true from outside the celery worker (Though I have looked this up quite a bit and it doesn't seem possible to have access to Task objects directly from another module)
  • Partition the chain into multiple chains delimited by Interactive jobs. Have some sort of mechanism outside the celery worker detect once a chain has reached it's end and trigger the interactive task's interactive client side component. Once the user has entered all this data, get the data, and start the new chain where the interactive task is at the head of the new chain.
like image 985
Anton K Avatar asked Feb 15 '13 01:02

Anton K


1 Answers

We have implemented something like your second idea in our project & it works fine. Here is the gist of the implementation.

Add new field status to your model & override save method.

models.py:

class My_Model(models.Model):
    # some fields
    status = models.IntegerField(default=0)

    def save(self, *args, **kwargs):
        super(My_Model, self).save(*args, **kwargs)
        from .functions import custom_func
        custom_func(self.status)

tasks.py

@celery.task()
def non_interactive_task():
    #do something.

@celery.task()
def interactive_task():
    #do something.

functions.py

def custom_func(status):
    #Change status after non interactive task is completed.
    #Based on status, start interactive task.

Pass status variable to template which is useful for displaying UI element for user to enter information. When user enter required info, change the status. This calls custom_func which triggers your interactive_task.

like image 159
Pandikunta Anand Reddy Avatar answered Sep 23 '22 12:09

Pandikunta Anand Reddy