Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pipeline an iterator to multiple consumers?

Tags:

python

Is it possible to "pipeline" consumption of a generator across multiple consumers?

For example, it's common to have code with this pattern:

def consumer1(iterator):
    for item in iterator:
        foo(item)

def consumer2(iterator):
    for item in iterator:
        bar(item)

myiter = list(big_generator())
v1 = consumer1(myiter)
v2 = consumer2(myiter)

In this case, multiple functions completely consume the same iterator, making it necessary to cache the iterator in a list. Since each consumer exhausts the iterator, itertools.tee is useless.

I see code like this a lot and I always wish I could get the consumers to consume one item at a time in order instead of caching the entire iterator. E.g.:

  1. consumer1 consumes myiter[0]
  2. consumer2 consumes myiter[0]
  3. consumer1 consumes myiter[1]
  4. consumer2 consumes myiter[1]
  5. etc...

If I were to make up a syntax, it would look like this:

c1_retval, c2_retval = iforkjoin(big_generator(), (consumer1, consumer2))

You can get close with threads or multiprocessing and teed iterators, but threads consume at different speeds meaning that the value deque cached inside tee could get very large. The point here is not to exploit parallelism or to speed up tasks but to avoid caching large sections of the iterator.

It seems to me that this might be impossible without modifying the consumers because the flow of control is in the consumer. However, when a consumer actually consumes the iterator control passes into the iterator's next() method, so maybe it is possible to invert the flow of control somehow so that the iterator blocks the consumers one at a time until it can feed them all?

If this is possible, I'm not clever enough to see how. Any ideas?

like image 438
Francis Avila Avatar asked Mar 24 '13 03:03

Francis Avila


2 Answers

With the limitation of not changing consumers' code (i.e. having a loop in them), you're left with only two options:

  1. the approach you already include in your question: caching the generated items in memory, then iterating over them multiple times.
  2. running each consumer in a thread, and implement some kind of synchronized-itertools.tee, one with buffer of size=1, which blocks serving item i+1 until item i has been served to all consumers.

There are no other options. You can't achieve all of the below, as they are contradicting:

  1. having a generator
  2. having a loop to consume all of it
  3. then, (serially-)after the previous loop has finished, having another loop to consume all of it again
  4. only keeping O(1) items in memory (or disk, etc.) while consuming them
  5. not regenerating (i.e. not re-creating the generator)

The generated items must be stored somewhere if you want to reuse them.

If changing the consumers' code is acceptable, clearly @monkey's solution is the simplest and most straightforward.

like image 153
shx2 Avatar answered Nov 17 '22 21:11

shx2


Doesn't this work? Or do you require the entire iterator so a copy to each like this, won't work? If so, then I think you either have to create a copy, else generate the list twice?

for item in big_generator():
    consumer1.handle_item(item)
    consumer2.handle_item(item)
like image 23
ninMonkey Avatar answered Nov 17 '22 22:11

ninMonkey