Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I form multiple pipelines with generators?

I'm using python and I'm trying to find a way to elegantly chain multiple generators together. An example of the problem would be having for instance a root generator which provides some kind of data and each value being passed to its "children" like a cascade which in turn may modify the object they receive. I could go for this route:

for x in gen1:
    gen2(x)
    gen3(x)

But it's ugly and not elegant. I was thinking about a more functional way of doing things.

like image 781
Warrior Avatar asked Mar 09 '23 05:03

Warrior


2 Answers

You could turn the generators into coroutines so they can send() and receive values from one another (using (yield) expressions). This will give each one a chance to change the values they receive, and/or pass them along to the next generator/coroutine (or ignore them completely).

Note in the sample code below, I use a decorator named coroutine to "prime" the generator/coroutine functions. This causes them execute as far as just before their first yield expression/statement. It's a slightly modified version of one shown in this youtube video of a very illuminating talk titled A Curious Course on Coroutines and Concurrency that Dave Beazley gave at PyCon 2009.

As you should be able to see from the output produced, the data values are being processed by each pipeline that was configured via a single send() to the head coroutine, which then effectively "multiplexes" it down each pipeline. Since each sub-coroutine also does this, it would be possible to set up an elaborate "tree" of processes.

import sys

def coroutine(func):
    """ Decorator to "prime" generators used as coroutines. """
    def start(*args,**kwargs):
        cr = func(*args,**kwargs)  # Create coroutine generator function.
        next(cr)                   # Advance to just before its first yield.
        return cr
    return start

def pipe(name, value, divisor, coroutines):
    """ Utility function to send values to list of coroutines. """
    print('  {}: {} is divisible by {}'.format(name, value, divisor))
    for cr in coroutines:
        cr.send(value)

def this_func_name():
    """ Helper function that returns name of function calling it. """
    frame = sys._getframe(1)
    return frame.f_code.co_name


@coroutine
def gen1(*coroutines):
    while True:
        value = (yield)     # Receive values sent here via "send()".
        if value % 2 == 0:  # Only pipe even values.
            pipe(this_func_name(), value, 2, coroutines)

@coroutine
def gen2(*coroutines):
    while True:
        value = (yield)     # Receive values sent here via "send()".
        if value % 4 == 0:  # Only pipe values divisible by 4.
            pipe(this_func_name(), value, 4, coroutines)

@coroutine
def gen3(*coroutines):
    while True:
        value = (yield)     # Receive values sent here via "send()".
        if value % 6 == 0:  # Only pipe values divisible by 6.
            pipe(this_func_name(), value, 6, coroutines)

# Create and link together some coroutine pipelines.
g3 = gen3()
g2 = gen2()
g1 = gen1(g2, g3)

# Send values through both pipelines (g1 -> g2, and g1 -> g3) of coroutines.
for value in range(17):
    print('piping {}'.format(value))
    g1.send(value)

Output:

piping 0
  gen1: 0 is divisible by 2
  gen2: 0 is divisible by 4
  gen3: 0 is divisible by 6
piping 1
piping 2
  gen1: 2 is divisible by 2
piping 3
piping 4
  gen1: 4 is divisible by 2
  gen2: 4 is divisible by 4
piping 5
piping 6
  gen1: 6 is divisible by 2
  gen3: 6 is divisible by 6
piping 7
piping 8
  gen1: 8 is divisible by 2
  gen2: 8 is divisible by 4
piping 9
piping 10
  gen1: 10 is divisible by 2
piping 11
piping 12
  gen1: 12 is divisible by 2
  gen2: 12 is divisible by 4
  gen3: 12 is divisible by 6
piping 13
piping 14
  gen1: 14 is divisible by 2
piping 15
piping 16
  gen1: 16 is divisible by 2
  gen2: 16 is divisible by 4
like image 95
martineau Avatar answered Mar 17 '23 06:03

martineau


A pipeline might look more like this:

for x in gen3(gen2(gen1())):
    print x

For example:

for i, x in enumerate(range(10)):
    print i, x

There isn't a way to fork (or "tee") a pipeline in Python. If you want multiple pipelines, you'll have to duplicate them: gen2(gen1()) and gen3(gen1()).

like image 42
Brent Washburne Avatar answered Mar 17 '23 05:03

Brent Washburne