How can I asynchronously insert tasks to run in an asyncio
event loop running in another thread?
My motivation is to support interactive asynchronous workloads in the interpreter. I can't block the main REPL thread.
My current flawed understanding says that the following should work. Why doesn't it? What is a better way to accomplish goal above?
import asyncio from threading import Thread loop = asyncio.new_event_loop() def f(loop): asyncio.set_event_loop(loop) loop.run_forever() t = Thread(target=f, args=(loop,)) t.start() @asyncio.coroutine def g(): yield from asyncio.sleep(1) print('Hello, world!') asyncio.async(g(), loop=loop)
1.5.Much like we can have multiple threads running at the same time, each with their own concurrent I/O operation, we can have many coroutines running alongside one another. While we are waiting for our I/O bound coroutines to finish, we can still execute other Python code, thus giving us concurrency.
One of the cool advantages of asyncio is that it scales far better than threading . Each task takes far fewer resources and less time to create than a thread, so creating and running more of them works well. This example just creates a separate task for each site to download, which works out quite well.
asyncio. get_event_loop () Get the current event loop. If there is no current event loop set in the current OS thread, the OS thread is main, and set_event_loop() has not yet been called, asyncio will create a new event loop and set it as the current one.
In other words, we use async and await to write asynchronous code but can't run it concurrently. To run multiple operations concurrently, we'll need to use something called tasks.
You must use call_soon_threadsafe
to schedule callbacks from different threads:
import asyncio from threading import Thread loop = asyncio.new_event_loop() def f(loop): asyncio.set_event_loop(loop) loop.run_forever() t = Thread(target=f, args=(loop,)) t.start() @asyncio.coroutine def g(): yield from asyncio.sleep(1) print('Hello, world!') loop.call_soon_threadsafe(asyncio.async, g())
See https://docs.python.org/3/library/asyncio-dev.html#asyncio-multithreading for more information.
EDIT: Example of an interpreter supporting asynchronous workloads
# vim: filetype=python3 tabstop=2 expandtab import asyncio as aio import random @aio.coroutine def async_eval(input_, sec): yield from aio.sleep(sec) print("") try: result = eval(input_) except Exception as e: print("< {!r} does not compute >".format(input_)) else: print("< {!r} = {} >".format(input_, result)) @aio.coroutine def main(loop): while True: input_ = yield from loop.run_in_executor(None, input, "> ") if input_ == "quit": break elif input_ == "": continue else: sec = random.uniform(5, 10) print("< {!r} scheduled for execution in {:.02} sec>".format(input_, sec)) aio.async(async_eval(input_, sec)) loop = aio.get_event_loop() loop.run_until_complete(main(loop)) loop.close()
The first example in Jashandeep Sohi's answer does not work for me in 3.7+ and prints warnings about the deprecated annotation. I reworked this into a something that runs under 3.8. I tweaked it a little to meet my needs as well. I am new to multi-threading in Python (but no multithreading in general) so any advice, guidance, etc. is appreciated:
import asyncio from threading import Thread loop = asyncio.new_event_loop() running = True def evaluate(future): global running stop = future.result() if stop: print("press enter to exit...") running = False def side_thread(loop): asyncio.set_event_loop(loop) loop.run_forever() thread = Thread(target=side_thread, args=(loop,), daemon=True) thread.start() async def display(text): await asyncio.sleep(5) print("echo:", text) return text == "exit" while running: text = input("enter text: ") future = asyncio.run_coroutine_threadsafe(display(text), loop) future.add_done_callback(evaluate) print("exiting")
The echo and other output will conflict with the prompts but it should be good enough to demonstrate it it working.
One thing I am unsure about is setting the global running
from one thread and reading it from another. I think maybe the GIL synchronizes the thread cache but I'd love to get confirmation (or not) about that.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With