I have a long_task function which runs a heavy cpu-bound calculation and I want to make it asynchronous by using the new asyncio framework. The resulting long_task_async function uses a ProcessPoolExecutor to offload work to a different process to not be constrained by the GIL.
The trouble is that for some reason the concurrent.futures.Future instance returned from ProcessPoolExecutor.submit when yielded from throws a TypeError. Is this by design? Are those futures not compatible with asyncio.Future class? What would be a workaround?
I also noticed that generators are not picklable so submitting a couroutine to the ProcessPoolExecutor is going to fail. Is there any clean solution to this as well?
import asyncio
from concurrent.futures import ProcessPoolExecutor
@asyncio.coroutine
def long_task():
yield from asyncio.sleep(4)
return "completed"
@asyncio.coroutine
def long_task_async():
with ProcessPoolExecutor(1) as ex:
return (yield from ex.submit(long_task)) #TypeError: 'Future' object is not iterable
# long_task is a generator, can't be pickled
loop = asyncio.get_event_loop()
@asyncio.coroutine
def main():
n = yield from long_task_async()
print( n )
loop.run_until_complete(main())
You can get results from the ThreadPoolExecutor in the order that tasks are completed by calling the as_completed() module function. The function takes a collection of Future objects and will return the same Future objects in the order that their associated tasks are completed.
Typically Futures are used to enable low-level callback-based code (e.g. in protocols implemented using asyncio transports) to interoperate with high-level async/await code. The rule of thumb is to never expose Future objects in user-facing APIs, and the recommended way to create a Future object is to call loop.
The concurrent. futures module provides a high-level interface for asynchronously executing callables. The asynchronous execution can be performed with threads, using ThreadPoolExecutor , or separate processes, using ProcessPoolExecutor .
The ThreadPoolExecutor class extends the abstract Executor class. The Executor class defines three methods used to control our thread pool; they are: submit(), map(), and shutdown(). submit(): Dispatch a function to be executed and return a future object. map(): Apply a function to an iterable of elements.
You want to use loop.run_in_executor, which uses a concurrent.futures executor, but maps the return value to an asyncio future.
The original asyncio PEP suggests that concurrent.futures.Future may someday grow a __iter__ method so it can be used with yield from as well, but for now the library has been designed to only require yield from support and nothing more. (Otherwise some code wouldn't actually work in 3.3.)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With