I'm doing a pipeline code refactoring using python.
Assuming we have a series of generator functions and we want to chain those to form a data processing pipeline.
Example:
#!/usr/bin/python
import itertools
def foo1(g):
for i in g:
yield i + 1
def foo2(g):
for i in g:
yield 10 + i
def foo3(g):
for i in g:
yield 'foo3:' + str(i)
res = foo3(foo2(foo1(range(0, 5))))
for i in res:
print i
Output:
foo3:11
foo3:12
foo3:13
foo3:14
foo3:15
I do not think foo3(foo2(foo1(range(0, 5))))
is a pythonic way to achieve my pipeline goal. Especially when the number of stages in the pipeline is large.
I wish I could rewrite it like chain in jquery. Something similar to :
range(0, 5).foo1().foo2().foo3()
Or maybe
l = [range(0, 5), foo1, foo2, foo3]
res = runner.run(l)
But I'm new to generator topic and couldn't find a way to achieve this.
Any help will be welcome.
I sometimes like to use a left fold (called reduce
in Python) for this type of situation:
from functools import reduce
def pipeline(*steps):
return reduce(lambda x, y: y(x), list(steps))
res = pipeline(range(0, 5), foo1, foo2, foo3)
Or even better:
def compose(*funcs):
return lambda x: reduce(lambda f, g: g(f), list(funcs), x)
p = compose(foo1, foo2, foo3)
res = p(range(0, 5))
I do not think foo3(foo2(foo1(range(0, 5)))) is a pythonic way to achieve my pipeline goal. Especially when the number of stages in the pipeline is large.
There is a fairly trivial, and in my opinion clear, way of chaining generators: assigning the result of each to a variable, where each can have a descriptive name.
range_iter = range(0, 5)
foo1_iter = foo1(range_iter)
foo2_iter = foo2(foo1_iter)
foo3_iter = foo3(foo2_iter)
for i in foo3_iter:
print(i)
I prefer this to a something that uses a higher order function, e.g. a reduce
or similar:
In my real cases, often each foo* generator function needs its own other parameters, which is tricky if using a reduce
.
In my real cases, the steps in the pipeline are not dynamic at runtime: it seems a bit odd/unexpected (to me) to have a pattern that seems more appropriate for a dynamic case.
It's a bit inconsistent with how regular functions are typically written where each is called explicitly, and the result of each is passed to the call of the next. Yes, I guess a bit of duplication, but I'm happy with "calling a function" being duplicated since (to me) it's really clear.
No need for an import: it uses core language features.
Following up on your runner.run approach, let's define this utility function:
def recur(ops):
return ops[0](recur(ops[1:])) if len(ops)>1 else ops[0]
As an example:
>>> ops = foo3, foo2, foo1, range(0, 5)
>>> list( recur(ops) )
['foo3:11', 'foo3:12', 'foo3:13', 'foo3:14', 'foo3:15']
def backw(ops):
return ops[-1](backw(ops[:-1])) if len(ops)>1 else ops[0]
For example:
>>> list( backw([range(0, 5), foo1, foo2, foo3]) )
['foo3:11', 'foo3:12', 'foo3:13', 'foo3:14', 'foo3:15']
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With