Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multi-threaded asyncio in Python

I'm currently doing my first steps with asyncio in Python 3.5 and there is one problem that's bugging me. Obviously I haven't fully understood coroutines...

Here is a simplified version of what I'm doing.

In my class I have an open() method that creates a new thread. Within that thread I create a new event loop and a socket connection to some host. Then I let the loop run forever.

def open(self):
    # create thread
    self.thread = threading.Thread(target=self._thread)
    self.thread.start()
    # wait for connection
    while self.protocol is None:
        time.sleep(0.1)

def _thread(self):
    # create loop, connection and run forever
    self.loop = asyncio.new_event_loop()
    coro = self.loop.create_connection(lambda: MyProtocol(self.loop),
                                       'somehost.com', 1234)
    self.loop.run_until_complete(coro)
    self.loop.run_forever()

Stopping the connection is now quite simple, I just stop the loop from the main thread:

loop.call_soon_threadsafe(loop.stop)

Unfortunately I need to do some cleanup, especially I need to empty a queue before disconnecting from the server. So I tried something like this stop() method in MyProtocol:

class MyProtocol(asyncio.Protocol):
    def __init__(self, loop):
        self._loop = loop
        self._queue = []

    async def stop(self):
        # wait for all queues to empty
        while self._queue:
            await asyncio.sleep(0.1)
        # disconnect
        self.close()
        self._loop.stop()

The queue gets emptied from within the protocol's data_received() method, so I just want to wait for that to happen using the while loop with the asyncio.sleep() call. Afterwards I close the connection and stop the loop.

But how do I call this method from the main thread and wait for it? I tried the following, but none of them seem to work (protocol is the currently used instance of MyProtocol):

loop.call_soon_threadsafe(protocol.stop)
loop.call_soon_threadsafe(functools.partial(asyncio.ensure_future, protocol.stop(), loop=loop))
asyncio.ensure_future(protocol.stop(), loop=loop)

Can anyone please help me here? Thanks!

like image 224
The_Fallen Avatar asked Jan 07 '23 08:01

The_Fallen


1 Answers

Basically you want to schedule coroutine on loop of different thread. You could use run_coroutine_threadsafe:

future = asyncio.run_coroutine_threadsafe(protocol.stop, loop=loop)
future.result()  # wait for results

Or the old style async like in https://stackoverflow.com/a/32084907/681044

import asyncio
from threading import Thread

loop = asyncio.new_event_loop()

def f(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

t = Thread(target=f, args=(loop,))
t.start()    

@asyncio.coroutine
def g():
    yield from asyncio.sleep(1)
    print('Hello, world!')

loop.call_soon_threadsafe(asyncio.async, g())
like image 142
kwarunek Avatar answered Jan 08 '23 22:01

kwarunek