Synopsis
I'm developing a web application to learn Django (python 3.4 & Django 1.6.10). The web app has complex and often updated workflows. I decided to integrate the Django-Viewflow library (https://github.com/viewflow/viewflow/) as it seems to be a very convenient way to handle workflows and not incorporate the workflow logic with the application models.
In this case, I have created a workflow to collect authorship information and copyrights using the Django-Viewflow library. The workflow should be initiated each time an author is added to a book.
My Problem
The documentation offers step by step guidelines to integrate an end-to-end worklfow solution (frontend and backend). My problem is that I'm having hard time to control the workflow programmatically (specifically from the Book model).
Description of the application
I have a Book model (core model) with a many to many relationship to Authors.
myApp/models.py
class Book(models.Model):
title = models.CharField(max_length=100)
authors = models.ManyToManyField(Author)
publisher = models.ForeignKey(Publisher)
publication_date = models.DateField()
The workflow components are:
myFlow/models.py
from viewflow.models import Process
class AuthorInvitation(process)
consent_confirmed = models.BooleanField(default=False)
signature = models.CharField(max_length=150)
myFlow/flows.py
from viewflow import flow
from viewflow.base import this, Flow
from viewflow.contrib import celery
from viewflow.views import StartProcessView, ProcessView
from . import models, tasks
class AuthorInvitationFlow(Flow):
process_cls = models.AuthorInvitation
start = flow.Start(StartProcessView) \
.Permission(auto_create=True) \
.Next(this.notify)
notify = celery.Job(tasks.send_authorship_request) \
.Next(this.approve)
approve = flow.View(ProcessView, fields=["confirmed","signature"]) \
.Permission(auto_create=True) \
.Next(this.check_approve)
check_approve = flow.If(cond=lambda p: p.confirmed) \
.OnTrue(this.send) \
.OnFalse(this.end)
send = celery.Job(tasks.send_authorship) \
.Next(this.end)
end = flow.End()
Question
How can you control the workflow process programmatically (activate, confirm steps, redo steps, cancel the process....)? I have tried to dig into the code of the library. It seems that the class activate
contains the right method but not sure how the whole should be orchestrated.
Thanks in advance!
There are two additional Start build-in Tasks available for Flows
StartFunction - starts flow when the function called somewhere:
@flow_start_func
def create_flow(activation, **kwargs):
activation.prepare()
activation.done()
return activation
class FunctionFlow(Flow):
start = flow.StartFunction(create_flow) \
.Next(this.end)
# somewhere in the code
FunctionFlow.start.run(**some_kwargs)
StartSignal - starts flow on django signal receive:
class SignalFlow(Flow):
start = flow.StartSignal(some_signal, create_flow) \
.Next(this.end)
You can check the usage for them, and rest of build-in task in this viewflow test suite.
For manually process the task state, first you should get the task from the database, activate it, and call any activation method.
task = MyFlow.task_cls.objects.get(...)
activation = task.activate()
if activation.undo.can_proceed():
activation.undo()
Any activation transition have .can_proceed()
method, helps you to check, is the task in the state that allows the transition.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With