Imagine we have an iterator, say iter(range(1, 1000))
. And we have two functions, each accepting an iterator as the only parameter, say sum()
and max()
. In SQL world we would call them aggregate functions.
Is there any way to obtain results of both without buffering the iterator output?
To do it, we would need to pause and resume aggregate function execution, in order to feed them both with the same values without storing them. Maybe is there a way to express it using async things without sleeps?
Let's consider how to apply two aggregate functions to the same iterator, which we can only exhaust once. The initial attempt (which hardcodes sum
and max
for brevity, but is trivially generalizable to an arbitrary number of aggregate functions) might look like this:
def max_and_sum_buffer(it): content = list(it) p = sum(content) m = max(content) return p, m
This implementation has the downside that it stores all the generated elements in memory at once, despite both functions being perfectly capable of stream processing. The question anticipates this cop-out and explicitly requests the result to be produced without buffering the iterator output. Is it possible to do this?
It certainly seems possible. After all, Python iterators are external, so every iterator is already capable of suspending itself. How hard can it be to provide an adapter that splits an iterator into two new iterators that provide the same content? Indeed, this is exactly the description of itertools.tee
, which appears perfectly suited to parallel iteration:
def max_and_sum_tee(it): it1, it2 = itertools.tee(it) p = sum(it1) # XXX m = max(it2) return p, m
The above produces the correct result, but doesn't work the way we'd like it to. The trouble is that we're not iterating in parallel. Aggregate functions like sum
and max
never suspend - each insists on consuming all of the iterator content before producing the result. So sum
will exhaust it1
before max
has had a chance to run at all. Exhausting elements of it1
while leaving it2
alone will cause those elements to be accumulated inside an internal FIFO shared between the two iterators. That's unavoidable here - since max(it2)
must see the same elements, tee
has no choice but to accumulate them. (For more interesting details on tee
, refer to this post.)
In other words, there is no difference between this implementation and the first one, except that the first one at least makes the buffering explicit. To eliminate buffering, sum
and max
must run in parallel, not one after the other.
Let's see what happens if we run the aggregate functions in separate threads, still using tee
to duplicate the original iterator:
def max_and_sum_threads_simple(it): it1, it2 = itertools.tee(it) with concurrent.futures.ThreadPoolExecutor(2) as executor: sum_future = executor.submit(lambda: sum(it1)) max_future = executor.submit(lambda: max(it2)) return sum_future.result(), max_future.result()
Now sum
and max
actually run in parallel (as much as the GIL permits), threads being managed by the excellent concurrent.futures
module. It has a fatal flaw, however: for tee
not to buffer the data, sum
and max
must process their items at exactly the same rate. If one is even a little bit faster than the other, they will drift apart, and tee
will buffer all intermediate elements. Since there is no way to predict how fast each will run, the amount of buffering is both unpredictable and has the nasty worst case of buffering everything.
To ensure that no buffering occurs, tee
must be replaced with a custom generator that buffers nothing and blocks until all the consumers have observed the previous value before proceeding to the next one. As before, each consumer runs in its own thread, but now the calling thread is busy running a producer, a loop that actually iterates over the source iterator and signals that a new value is available. Here is an implementation:
def max_and_sum_threads(it): STOP = object() next_val = None consumed = threading.Barrier(2 + 1) # 2 consumers + 1 producer val_id = 0 got_val = threading.Condition() def send(val): nonlocal next_val, val_id consumed.wait() with got_val: next_val = val val_id += 1 got_val.notify_all() def produce(): for elem in it: send(elem) send(STOP) def consume(): last_val_id = -1 while True: consumed.wait() with got_val: got_val.wait_for(lambda: val_id != last_val_id) if next_val is STOP: return yield next_val last_val_id = val_id with concurrent.futures.ThreadPoolExecutor(2) as executor: sum_future = executor.submit(lambda: sum(consume())) max_future = executor.submit(lambda: max(consume())) produce() return sum_future.result(), max_future.result()
This is quite some amount of code for something so simple conceptually, but it is necessary for correct operation.
produce()
loops over the outside iterator and sends the items to the consumers, one value at a time. It uses a barrier, a convenient synchronization primitive added in Python 3.2, to wait until all consumers are done with the old value before overwriting it with the new one in next_val
. Once the new value is actually ready, a condition is broadcast. consume()
is a generator that transmits the produced values as they arrive, until detecting STOP
. The code can be generalized run any number of aggregate functions in parallel by creating consumers in a loop, and adjusting their number when creating the barrier.
The downside of this implementation is that it requires creation of threads (possibly alleviated by making the thread pool global) and a lot of very careful synchronization at each iteration pass. This synchronization destroys performance - this version is almost 2000 times slower than the single-threaded tee
, and 475 times slower than the simple but non-deterministic threaded version.
Still, as long as threads are used, there is no avoiding synchronization in some form. To completely eliminate synchronization, we must abandon threads and switch to cooperative multi-tasking. The question is is it possible to suspend execution of ordinary synchronous functions like sum
and max
in order to switch between them?
It turns out that the greenlet
third-party extension module enables exactly that. Greenlets are an implementation of fibers, lightweight micro-threads that switch between each other explicitly. This is sort of like Python generators, which use yield
to suspend, except greenlets offer a much more flexible suspension mechanism, allowing one to choose who to suspend to.
This makes it fairly easy to port the threaded version of max_and_sum
to greenlets:
def max_and_sum_greenlet(it): STOP = object() consumers = None def send(val): for g in consumers: g.switch(val) def produce(): for elem in it: send(elem) send(STOP) def consume(): g_produce = greenlet.getcurrent().parent while True: val = g_produce.switch() if val is STOP: return yield val sum_result = [] max_result = [] gsum = greenlet.greenlet(lambda: sum_result.append(sum(consume()))) gsum.switch() gmax = greenlet.greenlet(lambda: max_result.append(max(consume()))) gmax.switch() consumers = (gsum, gmax) produce() return sum_result[0], max_result[0]
The logic is the same, but with less code. As before, produce
produces values retrieved from the source iterator, but its send
doesn't bother with synchronization, as it doesn't need to when everything is single-threaded. Instead, it explicitly switches to every consumer in turn to do its thing, with the consumer dutifully switching right back. After going through all consumers, the producer is ready for the next iteration pass.
Results are retrieved using an intermediate single-element list because greenlet doesn't provide access to the return value of the target function (and neither does threading.Thread
, which is why we opted for concurrent.futures
above).
There are downsides to using greenlets, though. First, they don't come with the standard library, you need to install the greenlet extension. Then, greenlet is inherently non-portable because the stack-switching code is not supported by the OS and the compiler and can be considered somewhat of a hack (although an extremely clever one). A Python targeting WebAssembly or JVM or GraalVM would be very unlikely to support greenlet. This is not a pressing issue, but it's definitely something to keep in mind for the long haul.
As of Python 3.5, Python provides native coroutines. Unlike greenlets, and similar to generators, coroutines are distinct from regular functions and must be defined using async def
. Coroutines can't be easily executed from synchronous code, they must instead be processed by a scheduler which drives them to completion. The scheduler is also known as an event loop because its other job is to receive IO events and pass them to appropriate callbacks and coroutines. In the standard library, this is the role of the asyncio
module.
Before implementing an asyncio-based max_and_sum
, we must first resolve a hurdle. Unlike greenlet, asyncio is only able to suspend execution of coroutines, not of arbitrary functions. So we need to replace sum
and max
with coroutines that do essentially the same thing. This is as simple as implementing them in the obvious way, only replacing for
with async for
, enabling the async iterator to suspend the coroutine while waiting for the next value to arrive:
async def asum(it): s = 0 async for elem in it: s += elem return s async def amax(it): NONE_YET = object() largest = NONE_YET async for elem in it: if largest is NONE_YET or elem > largest: largest = elem if largest is NONE_YET: raise ValueError("amax() arg is an empty sequence") return largest # or, using https://github.com/vxgmichel/aiostream # #from aiostream.stream import accumulate #def asum(it): # return accumulate(it, initializer=0) #def amax(it): # return accumulate(it, max)
One could reasonably ask if providing a new pair of aggregate functions is cheating; after all, the previous solutions were careful to use existing sum
and max
built-ins. The answer will depend on the exact interpretation of the question, but I would argue that the new functions are allowed because they are in no way specific to the task at hand. They do the exact same thing the built-ins do, but consuming async iterators. I suspect that the only reason such functions don't already exist somewhere in the standard library is due to coroutines and async iterators being a relatively new feature.
With that out of the way, we can proceed to write max_and_sum
as a coroutine:
async def max_and_sum_asyncio(it): loop = asyncio.get_event_loop() STOP = object() next_val = loop.create_future() consumed = loop.create_future() used_cnt = 2 # number of consumers async def produce(): for elem in it: next_val.set_result(elem) await consumed next_val.set_result(STOP) async def consume(): nonlocal next_val, consumed, used_cnt while True: val = await next_val if val is STOP: return yield val used_cnt -= 1 if not used_cnt: consumed.set_result(None) consumed = loop.create_future() next_val = loop.create_future() used_cnt = 2 else: await consumed s, m, _ = await asyncio.gather(asum(consume()), amax(consume()), produce()) return s, m
Although this version is based on switching between coroutines inside a single thread, just like the one using greenlet, it looks different. asyncio doesn't provide explicit switching of coroutines, it bases task switching on the await
suspension/resumption primitive. The target of await
can be another coroutine, but also an abstract "future", a value placeholder which will be filled in later by some other coroutine. Once the awaited value becomes available, the event loop automatically resumes execution of the coroutine, with the await
expression evaluating to the provided value. So instead of produce
switching to consumers, it suspends itself by awaiting a future that will arrive once all the consumers have observed the produced value.
consume()
is an asynchronous generator, which is like an ordinary generator, except it creates an async iterator, which our aggregate coroutines are already prepared to accept by using async for
. An async iterator's equivalent of __next__
is called __anext__
and is a coroutine, allowing the coroutine that exhausts the async iterator to suspend while waiting for the new value to arrive. When a running async generator suspends on an await
, that is observed by async for
as a suspension of the implicit __anext__
invocation. consume()
does exactly that when it waits for the values provided by produce
and, as they become available, transmits them to aggregate coroutines like asum
and amax
. Waiting is realized using the next_val
future, which carries the next element from it
. Awaiting that future inside consume()
suspends the async generator, and with it the aggregate coroutine.
The advantage of this approach compared to greenlets' explicit switching is that it makes it much easier to combine coroutines that don't know of each other into the same event loop. For example, one could have two instances of max_and_sum
running in parallel (in the same thread), or run a more complex aggregate function that invoked further async code to do calculations.
The following convenience function shows how to run the above from non-asyncio code:
def max_and_sum_asyncio_sync(it): # trivially instantiate the coroutine and execute it in the # default event loop coro = max_and_sum_asyncio(it) return asyncio.get_event_loop().run_until_complete(coro)
Measuring and comparing performance of these approaches to parallel execution can be misleading because sum
and max
do almost no processing, which over-stresses the overhead of parallelization. Treat these as you would treat any microbenchmarks, with a large grain of salt. Having said that, let's look at the numbers anyway!
Measurements were produced using Python 3.6 The functions were run only once and given range(10000)
, their time measured by subtracting time.time()
before and after the execution. Here are the results:
max_and_sum_buffer
and max_and_sum_tee
: 0.66 ms - almost exact same time for both, with the tee
version being a bit faster.
max_and_sum_threads_simple
: 2.7 ms. This timing means very little because of non-deterministic buffering, so this might be measuring the time to start two threads and the synchronization internally performed by Python.
max_and_sum_threads
: 1.29 seconds, by far the slowest option, ~2000 times slower than the fastest one. This horrible result is likely caused by a combination of the multiple synchronizations performed at each step of the iteration and their interaction with the GIL.
max_and_sum_greenlet
: 25.5 ms, slow compared to the initial version, but much faster than the threaded version. With a sufficiently complex aggregate function, one can imagine using this version in production.
max_and_sum_asyncio
: 351 ms, almost 14 times slower than the greenlet version. This is a disappointing result because asyncio coroutines are more lightweight than greenlets, and switching between them should be much faster than switching between fibers. It is likely that the overhead of running the coroutine scheduler and the event loop (which in this case is overkill given that the code does no IO) is destroying the performance on this micro-benchmark.
max_and_sum_asyncio
using uvloop
: 125 ms. This is more than twice the speed of regular asyncio, but still almost 5x slower than greenlet.
Running the examples under PyPy doesn't bring significant speedup, in fact most of the examples run slightly slower, even after running them several times to ensure JIT warmup. The asyncio function requires a rewrite not to use async generators (since PyPy as of this writing implements Python 3.5), and executes in somewhat under 100ms. This is comparable to CPython+uvloop performance, i.e. better, but not dramatic compared to greenlet.
If it holds for your aggregate functions that f(a,b,c,...) == f(a, f(b, f(c, ...)))
,then you could just cycle through your functions and feed them one element at a time, each time combining them with the result of the previous application, like reduce
would do, e.g. like this:
def aggregate(iterator, *functions): first = next(iterator) result = [first] * len(functions) for item in iterator: for i, f in enumerate(functions): result[i] = f((result[i], item)) return result
This is considerably slower (about 10-20 times) than just materializing the iterator in a list and applying the aggregate function on the list as a whole, or using itertools.tee
(which basically does the same thing, internally), but it has the benefit of using no additional memory.
Note, however, that while this works well for functions like sum
, min
or max
, it does not work for other aggregating functions, e.g. finding the mean or median element of an iterator, as mean(a, b, c) != mean(a, mean(b, c))
. (For mean
, you could of course just get the sum
and divide it by the number of elements, but computing e.g. the median taking just one element at a time will be more challenging.)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With