I have a long_task
function which runs a heavy cpu-bound calculation and I want to make it asynchronous by using the new asyncio framework. The resulting long_task_async
function uses a ProcessPoolExecutor
to offload work to a different process to not be constrained by the GIL.
The trouble is that for some reason the concurrent.futures.Future
instance returned from ProcessPoolExecutor.submit
when yielded from throws a TypeError
. Is this by design? Are those futures not compatible with asyncio.Future
class? What would be a workaround?
I also noticed that generators are not picklable so submitting a couroutine to the ProcessPoolExecutor
is going to fail. Is there any clean solution to this as well?
import asyncio
from concurrent.futures import ProcessPoolExecutor
@asyncio.coroutine
def long_task():
yield from asyncio.sleep(4)
return "completed"
@asyncio.coroutine
def long_task_async():
with ProcessPoolExecutor(1) as ex:
return (yield from ex.submit(long_task)) #TypeError: 'Future' object is not iterable
# long_task is a generator, can't be pickled
loop = asyncio.get_event_loop()
@asyncio.coroutine
def main():
n = yield from long_task_async()
print( n )
loop.run_until_complete(main())
You can get results from the ThreadPoolExecutor in the order that tasks are completed by calling the as_completed() module function. The function takes a collection of Future objects and will return the same Future objects in the order that their associated tasks are completed.
Typically Futures are used to enable low-level callback-based code (e.g. in protocols implemented using asyncio transports) to interoperate with high-level async/await code. The rule of thumb is to never expose Future objects in user-facing APIs, and the recommended way to create a Future object is to call loop.
The concurrent. futures module provides a high-level interface for asynchronously executing callables. The asynchronous execution can be performed with threads, using ThreadPoolExecutor , or separate processes, using ProcessPoolExecutor .
The ThreadPoolExecutor class extends the abstract Executor class. The Executor class defines three methods used to control our thread pool; they are: submit(), map(), and shutdown(). submit(): Dispatch a function to be executed and return a future object. map(): Apply a function to an iterable of elements.
You want to use loop.run_in_executor
, which uses a concurrent.futures
executor, but maps the return value to an asyncio
future.
The original asyncio
PEP suggests that concurrent.futures.Future
may someday grow a __iter__
method so it can be used with yield from
as well, but for now the library has been designed to only require yield from
support and nothing more. (Otherwise some code wouldn't actually work in 3.3.)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With