Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

parallel processing of DAG

I'm trying hard to figure out how I can process a directed acyclic graph in parallel. Each node should only be able to "execute" when all its input nodes have been processed beforehand. Imagine a class Task with the following interface:

class Task(object):
    result = None
    def inputs(self):
        ''' List all requirements of the task. '''
        return ()
    def run(self):
        pass

I can not think of a way to process the graph that could be represented by this structure asynchronously with a maximum number of workers at the same time, except for one method.

I think the optimal processing would be achieved by creating a thread for each task, waiting for all inputs to be processed. But, spawning a thread for each task immediately instead of consecutively (i.e. when the task is ready to be processed) does not sound like a good idea to me.

import threading
class Runner(threading.Thread):
    def __init__(self, task):
        super(Runner, self).__init__()
        self.task = task
        self.start()
    def run(self):
        threads = [Runner(r) for r in self.task.inputs()]
        [t.join() for t in threads]
        self.task.run()

Is there a way to mimic this behaviour more ideally? Also, this approach does currently not implement a way to limit the number of running tasks at a time.

like image 712
Niklas R Avatar asked Oct 20 '22 21:10

Niklas R


2 Answers

Have one master thread push items to a queue once they are ready for being processsed. Then have a pool of workers listen on the queue for tasks to work on. (Python provides a synchronized queue in the Queue module, renamed to lower-case queue in Python 3).

The master first creates a map from dependencies to dependent tasks. Every task that doesn't have any dependcies can go into the queue. Everytime a task is completed, the master uses the dictionary to figure out which dependent tasks there are, and puts them into the queue if all their depndencies are met now.

like image 135
Sven Marnach Avatar answered Nov 03 '22 01:11

Sven Marnach


Celery (http://www.celeryproject.org/) is the leading task management tool for Python. It should be able to help you with this.

like image 40
greavg Avatar answered Nov 03 '22 01:11

greavg