I am trying to design an async pipeline that can easily make a data processing pipeline. The pipeline is composed of several functions. Input data goes in at one end of the pipeline and comes out at the other end.
I want to design the pipeline in a way that:
Here is what I came up with:
import asyncio
@asyncio.coroutine
def add(x):
return x + 1
@asyncio.coroutine
def prod(x):
return x * 2
@asyncio.coroutine
def power(x):
return x ** 3
def connect(funcs):
def wrapper(*args, **kwargs):
data_out = yield from funcs[0](*args, **kwargs)
for func in funcs[1:]:
data_out = yield from func(data_out)
return data_out
return wrapper
pipeline = connect([add, prod, power])
input = 1
output = asyncio.get_event_loop().run_until_complete(pipeline(input))
print(output)
This works, of course, but the problem is that if I want to add another function into (or pop out a function from) this pipeline, I have to disassemble and reconnect every function again.
I would like to know if there is a better scheme or design pattern to create such a pipeline?
I've done something similar before, using just the multiprocessing library. It's a bit more manual, but it gives you the ability to easily create and modify your pipeline, as you've requested in your question.
The idea is to create functions that can live in a multiprocessing pool, and their only arguments are an input queue and an output queue. You tie the stages together by passing them different queues. Each stage receives some work on its input queue, does some more work, and passes the result out to the next stage through its output queue.
The workers spin on trying to get something from their queues, and when they get something, they do their work and pass the result to the next stage. All of the work ends by passing a "poison pill" through the pipeline, causing all stages to exit:
This example just builds a string in multiple work stages:
import multiprocessing as mp
POISON_PILL = "STOP"
def stage1(q_in, q_out):
while True:
# get either work or a poison pill from the previous stage (or main)
val = q_in.get()
# check to see if we got the poison pill - pass it along if we did
if val == POISON_PILL:
q_out.put(val)
return
# do stage 1 work
val = val + "Stage 1 did some work.\n"
# pass the result to the next stage
q_out.put(val)
def stage2(q_in, q_out):
while True:
val = q_in.get()
if val == POISON_PILL:
q_out.put(val)
return
val = val + "Stage 2 did some work.\n"
q_out.put(val)
def main():
pool = mp.Pool()
manager = mp.Manager()
# create managed queues
q_main_to_s1 = manager.Queue()
q_s1_to_s2 = manager.Queue()
q_s2_to_main = manager.Queue()
# launch workers, passing them the queues they need
results_s1 = pool.apply_async(stage1, (q_main_to_s1, q_s1_to_s2))
results_s2 = pool.apply_async(stage2, (q_s1_to_s2, q_s2_to_main))
# Send a message into the pipeline
q_main_to_s1.put("Main started the job.\n")
# Wait for work to complete
print(q_s2_to_main.get()+"Main finished the job.")
q_main_to_s1.put(POISON_PILL)
pool.close()
pool.join()
return
if __name__ == "__main__":
main()
The code produces this output:
Main started the job.
Stage 1 did some work.
Stage 2 did some work.
Main finished the job.
You can easily put more stages in the pipeline or rearrange them just by changing which functions get which queues. I'm not very familiar with the asyncio module, so I can't speak to what capabilities you would be losing by using the multiprocessing library instead, but this approach is very straightforward to implement and understand, so I like its simplicity.
I don't know if it is the best way to do it but here is my solution.
While I think it's possible to control a pipeline using a list or a dictionary I found easier and more efficent to use a generator.
Consider the following generator:
def controller():
old = value = None
while True:
new = (yield value)
value = old
old = new
This is basically a one-element queue, it stores the value that you send it and releases it at the next call of send
(or next
).
Example:
>>> c = controller()
>>> next(c) # prime the generator
>>> c.send(8) # send a value
>>> next(c) # pull the value from the generator
8
By associating every coroutine in the pipeline with its controller we will have an external handle that we can use to push the target of each one. We just need to define our coroutines in a way that they will pull the new target from our controller every cycle.
Now consider the following coroutines:
def source(controller):
while True:
target = next(controller)
print("source sending to", target.__name__)
yield (yield from target)
def add():
return (yield) + 1
def prod():
return (yield) * 2
The source is a coroutine that does not return
so that it will not terminate itself after the first cycle. The other coroutines are "sinks" and does not need a controller.
You can use these coroutines in a pipeline as in the following example. We initially set up a route source --> add
and after receiving the first result we change the route to source --> prod
.
# create a controller for the source and prime it
cont_source = controller()
next(cont_source)
# create three coroutines
# associate the source with its controller
coro_source = source(cont_source)
coro_add = add()
coro_prod = prod()
# create a pipeline
cont_source.send(coro_add)
# prime the source and send a value to it
coro_source.send(None)
print("add =", coro_source.send(4))
# change target of the source
cont_source.send(coro_prod)
# reset the source, send another value
coro_source.send(None)
print("prod =", coro_source.send(8))
Output:
source sending to add
add = 5
source sending to prod
prod = 16
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With