Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to design an async pipeline pattern in python

I am trying to design an async pipeline that can easily make a data processing pipeline. The pipeline is composed of several functions. Input data goes in at one end of the pipeline and comes out at the other end.

I want to design the pipeline in a way that:

  1. Additional functions can be insert in the pipeline
  2. Functions already in the pipeline can be popped out.

Here is what I came up with:

import asyncio

@asyncio.coroutine
def add(x):
    return x + 1

@asyncio.coroutine
def prod(x):
    return x * 2

@asyncio.coroutine
def power(x):
    return x ** 3

def connect(funcs):
    def wrapper(*args, **kwargs):
        data_out = yield from funcs[0](*args, **kwargs)
        for func in funcs[1:]:
            data_out = yield from func(data_out)
        return data_out
    return wrapper

pipeline = connect([add, prod, power])
input = 1
output = asyncio.get_event_loop().run_until_complete(pipeline(input))
print(output)

This works, of course, but the problem is that if I want to add another function into (or pop out a function from) this pipeline, I have to disassemble and reconnect every function again.

I would like to know if there is a better scheme or design pattern to create such a pipeline?

like image 672
shelper Avatar asked Mar 07 '16 01:03

shelper


2 Answers

I've done something similar before, using just the multiprocessing library. It's a bit more manual, but it gives you the ability to easily create and modify your pipeline, as you've requested in your question.

The idea is to create functions that can live in a multiprocessing pool, and their only arguments are an input queue and an output queue. You tie the stages together by passing them different queues. Each stage receives some work on its input queue, does some more work, and passes the result out to the next stage through its output queue.

The workers spin on trying to get something from their queues, and when they get something, they do their work and pass the result to the next stage. All of the work ends by passing a "poison pill" through the pipeline, causing all stages to exit:

This example just builds a string in multiple work stages:

import multiprocessing as mp                                              

POISON_PILL = "STOP"                                                      

def stage1(q_in, q_out):                                                  

    while True:

        # get either work or a poison pill from the previous stage (or main)
        val = q_in.get()                                                  

        # check to see if we got the poison pill - pass it along if we did
        if val == POISON_PILL:                                            
            q_out.put(val)                                                
            return                                                        

        # do stage 1 work                                                                  
        val = val + "Stage 1 did some work.\n"

        # pass the result to the next stage
        q_out.put(val)                                                    

def stage2(q_in, q_out):                                                  

    while True:                                                           

        val = q_in.get()                                                  
        if val == POISON_PILL:                                            
            q_out.put(val)                                                
            return                                                        

        val = val + "Stage 2 did some work.\n"                            
        q_out.put(val)                                                    

def main():                                                               

    pool = mp.Pool()                                                      
    manager = mp.Manager()                                                

    # create managed queues                                               
    q_main_to_s1 = manager.Queue()                                        
    q_s1_to_s2 = manager.Queue()                                          
    q_s2_to_main = manager.Queue()                                        

    # launch workers, passing them the queues they need                   
    results_s1 = pool.apply_async(stage1, (q_main_to_s1, q_s1_to_s2))     
    results_s2 = pool.apply_async(stage2, (q_s1_to_s2, q_s2_to_main))     

    # Send a message into the pipeline                                    
    q_main_to_s1.put("Main started the job.\n")                           

    # Wait for work to complete                                           
    print(q_s2_to_main.get()+"Main finished the job.")                    

    q_main_to_s1.put(POISON_PILL)                                         

    pool.close()                                                          
    pool.join()                                                           

    return                                                                

if __name__ == "__main__":                                                
    main()

The code produces this output:

Main started the job.
Stage 1 did some work.
Stage 2 did some work.
Main finished the job.

You can easily put more stages in the pipeline or rearrange them just by changing which functions get which queues. I'm not very familiar with the asyncio module, so I can't speak to what capabilities you would be losing by using the multiprocessing library instead, but this approach is very straightforward to implement and understand, so I like its simplicity.

like image 76
skrrgwasme Avatar answered Nov 06 '22 09:11

skrrgwasme


I don't know if it is the best way to do it but here is my solution.

While I think it's possible to control a pipeline using a list or a dictionary I found easier and more efficent to use a generator.

Consider the following generator:

def controller():
    old = value = None
    while True:
        new = (yield value)
        value = old
        old = new

This is basically a one-element queue, it stores the value that you send it and releases it at the next call of send (or next).

Example:

>>> c = controller()
>>> next(c)           # prime the generator
>>> c.send(8)         # send a value
>>> next(c)           # pull the value from the generator
8

By associating every coroutine in the pipeline with its controller we will have an external handle that we can use to push the target of each one. We just need to define our coroutines in a way that they will pull the new target from our controller every cycle.

Now consider the following coroutines:

def source(controller):
    while True:
        target = next(controller)
        print("source sending to", target.__name__) 
        yield (yield from target)

def add():
    return (yield) + 1

def prod():
    return (yield) * 2

The source is a coroutine that does not return so that it will not terminate itself after the first cycle. The other coroutines are "sinks" and does not need a controller. You can use these coroutines in a pipeline as in the following example. We initially set up a route source --> add and after receiving the first result we change the route to source --> prod.

# create a controller for the source and prime it 
cont_source = controller()
next(cont_source)

# create three coroutines
# associate the source with its controller
coro_source = source(cont_source)
coro_add = add()
coro_prod = prod()

# create a pipeline
cont_source.send(coro_add)

# prime the source and send a value to it
coro_source.send(None)
print("add =", coro_source.send(4))

# change target of the source
cont_source.send(coro_prod)

# reset the source, send another value
coro_source.send(None)
print("prod =", coro_source.send(8))

Output:

source sending to add
add = 5
source sending to prod
prod = 16
like image 27
user288431 Avatar answered Nov 06 '22 10:11

user288431