Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

viewflow.io: implementing a queue task

I would like to implement the following use case with the ViewFlow library:

Problem

Processes of a particular Flow, started by a user, must wait in a queue before executing a celery job. Each user has a queue of these processes. Based on a schedule, or triggered manually, the next process in the queue is allowed to proceed.

Example

A node within my flow enters a named queue. Other logic within the application determines, for each queue, when to allow the next task to proceed. The next task in the queue is selected and its activation's done() method called.

An example flow might look like this:

class MyFlow(Flow):

    start = flow.Start(...).Next(queue_wait)
    queue_wait = QueueWait("myQueue").Next(job)
    job = celery.Job(...).Next(end)
    end = flow.End()

Question

What would be the best approach to implement queueing? In the above example, I don't know what "QueueWait" should be.

I've read through the docs and viewflow code, but it's not yet clear to me if this can be done using built-in Node and Activation classes, such as func.Function, or if I need to extend with custom classes.

like image 435
Martin Avatar asked Sep 28 '22 05:09

Martin


1 Answers

After much experimentation, I arrived at a workable and simple solution:

from viewflow.flow import base
from viewflow.flow.func import FuncActivation
from viewflow.activation import STATUS


class Queue(base.NextNodeMixin,
            base.UndoViewMixin,
            base.CancelViewMixin,
            base.DetailsViewMixin,
            base.Event):

    """
    Node that halts the flow and waits in a queue. To process the next waiting task
    call the dequeue method, optionally specifying the task owner.

    Example placing a job in a queue::

        class MyFlow(Flow):
            wait = Queue().Next(this.job)
            job = celery.Job(send_stuff).Next(this.end)
            end = flow.End()

        somewhere in the application code:
        MyFlow.wait.dequeue()
        or:
        MyFlow.wait.dequeue(process__myprocess__owner=user)

    Queues are logically separated by the task_type, so new queues defined in a
    subclass by overriding task_type attribute.
    """

    task_type = 'QUEUE'
    activation_cls = FuncActivation

    def __init__(self, **kwargs):
        super(Queue, self).__init__(**kwargs)

    def dequeue(self, **kwargs):
        """
        Process the next task in the queue by created date/time. kwargs is
        used to add task filter arguments, thereby effectively splitting the queue
        into subqueues. This could be used to implement per-user queues.

        Returns True if task was found and dequeued, False otherwise
        """
        filter_kwargs = {'flow_task_type': self.task_type, 'status': STATUS.NEW}
        if kwargs is not None:
            filter_kwargs.update(kwargs)

        task = self.flow_cls.task_cls.objects.filter(**filter_kwargs).order_by('created').first()
        if task is not None:
            lock = self.flow_cls.lock_impl(self.flow_cls.instance)
            with lock(self.flow_cls, task.process_id):
                task = self.flow_cls.task_cls._default_manager.get(pk=task.pk)
                activation = self.activation_cls()
                activation.initialize(self, task)
                activation.prepare()
                activation.done()
            return True

        return False

I tried to make it as generic as possible and support the definition of multiple named queues as well as sub-queues, such as per-user queues.

like image 116
Martin Avatar answered Oct 21 '22 02:10

Martin