Is it possible to "pipeline" consumption of a generator across multiple consumers?
For example, it's common to have code with this pattern:
def consumer1(iterator):
for item in iterator:
foo(item)
def consumer2(iterator):
for item in iterator:
bar(item)
myiter = list(big_generator())
v1 = consumer1(myiter)
v2 = consumer2(myiter)
In this case, multiple functions completely consume the same iterator, making it necessary to cache the iterator in a list. Since each consumer exhausts the iterator, itertools.tee
is useless.
I see code like this a lot and I always wish I could get the consumers to consume one item at a time in order instead of caching the entire iterator. E.g.:
consumer1
consumes myiter[0]
consumer2
consumes myiter[0]
consumer1
consumes myiter[1]
consumer2
consumes myiter[1]
If I were to make up a syntax, it would look like this:
c1_retval, c2_retval = iforkjoin(big_generator(), (consumer1, consumer2))
You can get close with threads or multiprocessing and tee
d iterators, but threads consume at different speeds meaning that the value deque cached inside tee
could get very large. The point here is not to exploit parallelism or to speed up tasks but to avoid caching large sections of the iterator.
It seems to me that this might be impossible without modifying the consumers because the flow of control is in the consumer. However, when a consumer actually consumes the iterator control passes into the iterator's next()
method, so maybe it is possible to invert the flow of control somehow so that the iterator blocks the consumers one at a time until it can feed them all?
If this is possible, I'm not clever enough to see how. Any ideas?
With the limitation of not changing consumers' code (i.e. having a loop in them), you're left with only two options:
itertools.tee
, one with buffer of size=1, which blocks serving item i+1
until item i
has been served to all consumers.There are no other options. You can't achieve all of the below, as they are contradicting:
The generated items must be stored somewhere if you want to reuse them.
If changing the consumers' code is acceptable, clearly @monkey's solution is the simplest and most straightforward.
Doesn't this work? Or do you require the entire iterator so a copy to each like this, won't work? If so, then I think you either have to create a copy, else generate the list twice?
for item in big_generator():
consumer1.handle_item(item)
consumer2.handle_item(item)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With