Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Send asyncio tasks to loop running in other thread

How can I asynchronously insert tasks to run in an asyncio event loop running in another thread?

My motivation is to support interactive asynchronous workloads in the interpreter. I can't block the main REPL thread.

Example

My current flawed understanding says that the following should work. Why doesn't it? What is a better way to accomplish goal above?

import asyncio from threading import Thread  loop = asyncio.new_event_loop()  def f(loop):     asyncio.set_event_loop(loop)     loop.run_forever()  t = Thread(target=f, args=(loop,)) t.start()      @asyncio.coroutine def g():     yield from asyncio.sleep(1)     print('Hello, world!')  asyncio.async(g(), loop=loop) 
like image 870
MRocklin Avatar asked Aug 17 '15 21:08

MRocklin


People also ask

Does Asyncio run on multiple threads?

1.5.Much like we can have multiple threads running at the same time, each with their own concurrent I/O operation, we can have many coroutines running alongside one another. While we are waiting for our I/O bound coroutines to finish, we can still execute other Python code, thus giving us concurrency.

Is Asyncio faster than threading?

One of the cool advantages of asyncio is that it scales far better than threading . Each task takes far fewer resources and less time to create than a thread, so creating and running more of them works well. This example just creates a separate task for each site to download, which works out quite well.

What is Asyncio Get_event_loop ()?

asyncio. get_event_loop () Get the current event loop. If there is no current event loop set in the current OS thread, the OS thread is main, and set_event_loop() has not yet been called, asyncio will create a new event loop and set it as the current one.

Does Asyncio run concurrently?

In other words, we use async and await to write asynchronous code but can't run it concurrently. To run multiple operations concurrently, we'll need to use something called tasks.


2 Answers

You must use call_soon_threadsafe to schedule callbacks from different threads:

import asyncio from threading import Thread  loop = asyncio.new_event_loop()  def f(loop):     asyncio.set_event_loop(loop)     loop.run_forever()  t = Thread(target=f, args=(loop,)) t.start()      @asyncio.coroutine def g():     yield from asyncio.sleep(1)     print('Hello, world!')  loop.call_soon_threadsafe(asyncio.async, g()) 

See https://docs.python.org/3/library/asyncio-dev.html#asyncio-multithreading for more information.

EDIT: Example of an interpreter supporting asynchronous workloads

# vim: filetype=python3 tabstop=2 expandtab  import asyncio as aio import random  @aio.coroutine def async_eval(input_, sec):   yield from aio.sleep(sec)   print("")   try:     result = eval(input_)   except Exception as e:     print("< {!r} does not compute >".format(input_))   else:       print("< {!r} = {} >".format(input_, result))  @aio.coroutine def main(loop):   while True:     input_ = yield from loop.run_in_executor(None, input, "> ")      if input_ == "quit":       break     elif input_ == "":       continue     else:       sec = random.uniform(5, 10)       print("< {!r} scheduled for execution in {:.02} sec>".format(input_, sec))       aio.async(async_eval(input_, sec))  loop = aio.get_event_loop()  loop.run_until_complete(main(loop)) loop.close() 
like image 71
Jashandeep Sohi Avatar answered Sep 28 '22 09:09

Jashandeep Sohi


The first example in Jashandeep Sohi's answer does not work for me in 3.7+ and prints warnings about the deprecated annotation. I reworked this into a something that runs under 3.8. I tweaked it a little to meet my needs as well. I am new to multi-threading in Python (but no multithreading in general) so any advice, guidance, etc. is appreciated:

import asyncio from threading import Thread   loop = asyncio.new_event_loop() running = True   def evaluate(future):     global running     stop = future.result()     if stop:         print("press enter to exit...")         running = False   def side_thread(loop):     asyncio.set_event_loop(loop)     loop.run_forever()   thread = Thread(target=side_thread, args=(loop,), daemon=True) thread.start()   async def display(text):     await asyncio.sleep(5)     print("echo:", text)     return text == "exit"   while running:   text = input("enter text: ")   future = asyncio.run_coroutine_threadsafe(display(text), loop)   future.add_done_callback(evaluate)   print("exiting") 

The echo and other output will conflict with the prompts but it should be good enough to demonstrate it it working.

One thing I am unsure about is setting the global running from one thread and reading it from another. I think maybe the GIL synchronizes the thread cache but I'd love to get confirmation (or not) about that.

like image 43
JimmyJames Avatar answered Sep 28 '22 08:09

JimmyJames