Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

python asyncio run event loop once?

I am trying to understand the asyncio library, specifically with using sockets. I have written some code in an attempt to gain understanding,

I wanted to run a sender and a receiver sockets asynchrounously. I got to the point where I get all data sent up till the last one, but then I have to run one more loop. Looking at how to do this, I found this link from stackoverflow, which I implemented below -- but what is going on here? Is there a better/more sane way to do this than to call stop followed by run_forever?

The documentation for stop() in the event loop is:

Stop running the event loop.

Every callback scheduled before stop() is called will run. Callbacks scheduled after stop() is called will not run. However, those callbacks will run if run_forever() is called again later.

And run_forever()'s documentation is:

Run until stop() is called.

Questions:

  • why in the world is run_forever the only way to run_once? This doesn't even make sense
  • Is there a better way to do this?
  • Does my code look like a reasonable way to program with the asyncio library?
  • Is there a better way to add tasks to the event loop besides asyncio.async()? loop.create_task gives an error on my Linux system.

https://gist.github.com/cloudformdesign/b30e0860497f19bd6596

like image 357
vitiral Avatar asked Apr 25 '15 17:04

vitiral


1 Answers

The stop(); run_forever() trick works because of how stop is implemented:

def stop(self):
    """Stop running the event loop.

    Every callback scheduled before stop() is called will run.
    Callback scheduled after stop() is called won't.  However,
    those callbacks will run if run() is called again later.
    """
    self.call_soon(_raise_stop_error)

def _raise_stop_error(*args):
    raise _StopError

So, next time the event loop runs and executes pending callbacks, it's going to call _raise_stop_error, which raises _StopError. The run_forever loop will break only on that specific exception:

def run_forever(self):
    """Run until stop() is called."""
    if self._running:
        raise RuntimeError('Event loop is running.')
    self._running = True
    try:
        while True:
            try:
                self._run_once()
            except _StopError:
                break
    finally:
        self._running = False

So, by scheduling a stop() and then calling run_forever, you end up running one iteration of the event loop, then stopping once it hits the _raise_stop_error callback. You may have also noticed that _run_once is defined and called by run_forever. You could call that directly, but that can sometimes block if there aren't any callbacks ready to run, which may not be desirable. I don't think there's a cleaner way to do this currently - That answer was provided by Andrew Svetlov, who is an asyncio contributor; he would probably know if there's a better option. :)

In general, your code looks reasonable, though I think that you shouldn't be using this run_once approach to begin with. It's not deterministic; if you had a longer list or a slower system, it might require more than two extra iterations to print everything. Instead, you should just send a sentinel that tells the receiver to shut down, and then wait on both the send and receive coroutines to finish:

import sys
import time
import socket
import asyncio


addr = ('127.0.0.1', 1064)
SENTINEL = b"_DONE_" 

# ... (This stuff is the same)

@asyncio.coroutine
def sending(addr, dataiter):
    loop = asyncio.get_event_loop()
    for d in dataiter:
        print("Sending:", d)
        sock = socket.socket()
        yield from send_close(loop, sock, addr, str(d).encode())
    # Send a sentinel
    sock = socket.socket()
    yield from send_close(loop, sock, addr, SENTINEL)

@asyncio.coroutine
def receiving(addr):
    loop = asyncio.get_event_loop()
    sock = socket.socket()
    try:
        sock.setblocking(False)
        sock.bind(addr)
        sock.listen(5)

        while True:
            data = yield from accept_recv(loop, sock)
            if data == SENTINEL:  # Got a sentinel
                return
            print("Recevied:", data)
    finally: sock.close()

def main():
    loop = asyncio.get_event_loop()
    # add these items to the event loop
    recv = asyncio.async(receiving(addr), loop=loop)
    send = asyncio.async(sending(addr, range(10)), loop=loop)
    loop.run_until_complete(asyncio.wait([recv, send]))

main()

Finally, asyncio.async is the right way to add tasks to the event loop. create_task was added in Python 3.4.2, so if you have an earlier version it won't exist.

like image 76
dano Avatar answered Sep 18 '22 05:09

dano