Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Asyncio detecting disconnect hangs

I'm using Asyncio in Python 3.4, I'll try to explain what I'm doing to this point and what I (think) is causing the issue.

On one end I have a UDP connection framework with blocking operations, I'm taking the data I get from this stream and creating json that I pass out to the client in SSE format. This is all working great.

The issue I'm running into is that I can't get it to handle client disconnects properly if I don't do anything and a client disconnects I'll start getting this error:

WARNING [selector_events:613] socket.send() raised exception.

since the loop is still running, I've been looking into ways to cleanly break the loop and trigger the .close() but I'm running into issues with the examples I've found and there's not many resources online.

The one example that seems to actually work is trying to read a line from the client and if it's an empty string that means the client disconnected.

    while True:
        data = (yield from client_reader.readline())
        if not data: #client disconnected
            break

however after about ten messages all messages to the client stop, I think this is because it's hanging on "data = (yield from client_reader.readline())" after it hangs if I close the client then it does properly shut down and "End Connection" does get called. Any ideas why it might be hanging? I think I have a pretty good handle on Asyncio at this point but this one is puzzling me.

Note: location() and status() are my two calls to get info from the UDP socket - I've successfully run them without issues for many hours with this same code - minus the client disconnect lines.

clients = {}

def accept_client(client_reader, client_writer):
    task = asyncio.Task(handle_client(client_reader, client_writer))
    clients[task] = (client_writer)


def client_done(task):
    del clients[task]
    client_writer.close()
    log.info("End Connection")

log.info("New Connection")
task.add_done_callback(client_done)

@asyncio.coroutine
def handle_client(client_reader, client_writer):
    data = {'result':{'status':'Connection Ready'}}
    yield from postmessage(data,client_writer)
    while True:
        data = (yield from client_reader.readline())
        if not data: #client disconnected
            break
        data = yield from asyncio.wait_for(location(),
                                           timeout=1.0)
        yield from postmessage(data,client_writer)

        data = yield from asyncio.wait_for(status(),
                                           timeout=1.0)
        yield from postmessage(data,client_writer)

@asyncio.coroutine
def postmessage(data, client_writer):
        mimetype=('text/event-stream')
        response = ('data: {0}\n\n'.format(data).encode('utf-8'))
        client_writer.write(response)
        client_writer.drain()

Update: if I add a timeout on the "yield from client_reader" I get the following error when it gets to the point that it would normally hang.

2014-11-17 03:13:56,214 INFO [try:23] End Connection
2014-11-17 03:13:56,214 ERROR [base_events:912] Task exception was never retrieved
future: <Task finished coro=<handle_client() done, defined at try.py:29> exception=TimeoutError()>
Traceback (most recent call last):
  File "/opt/python3.4.2/lib/python3.4/asyncio/tasks.py", line 236, in _step
    result = next(coro)
  File "try.py", line 35, in handle_client
    timeout=1.0))
  File "/opt/python3.4.2/lib/python3.4/asyncio/tasks.py", line 375, in wait_for
    raise futures.TimeoutError()
concurrent.futures._base.TimeoutError

Here's a sample script showing the bug in action - just run it in python 3.4.2 and after 9 iterations it'll hang on reading from the client.

(The script complete so you can run it to see for yourself)

import asyncio
import logging

import json
import time

log = logging.getLogger(__name__)

clients = {}

def accept_client(client_reader, client_writer):
    task = asyncio.Task(handle_client(client_reader, client_writer))
    clients[task] = (client_writer)

    def client_done(task):
        del clients[task]
        client_writer.close()
        log.info("End Connection")

    log.info("New Connection")
    task.add_done_callback(client_done)


@asyncio.coroutine
def handle_client(client_reader, client_writer):
    data = {'result':{'status':'Connection Ready'}}
    postmessage(data,client_writer)
    count = 0
    while True:
        data = (yield from asyncio.wait_for(client_reader.readline(),timeout=1.0))
        if not data: #client disconnected
            break

        data = yield from asyncio.wait_for(test1(),timeout=1.0)
        yield from postmessage(data,client_writer)

        data = yield from asyncio.wait_for(test2(),timeout=1.0)
        yield from postmessage(data,client_writer)

@asyncio.coroutine
def postmessage(data, client_writer):
        mimetype=('text/event-stream')
        response = ('data: {0}\n\n'.format(data).encode('utf-8'))
        client_writer.write(response)
        client_writer.drain()

@asyncio.coroutine
def test1():
        data = {'result':{
                        'test1':{ }
                    }
                }
        data = json.dumps(data)
        return data

@asyncio.coroutine
def test2():
        data = {'result':{ 
                    'test2':{ }
                    }
                }
        data = json.dumps(data)
        return data 

def main():
    loop = asyncio.get_event_loop()
    f = asyncio.start_server(accept_client, host=None, port=2991)
    loop.run_until_complete(f)
    loop.run_forever()

if __name__ == '__main__':
    log = logging.getLogger("")
    formatter = logging.Formatter("%(asctime)s %(levelname)s " +
                                  "[%(module)s:%(lineno)d] %(message)s")
    # log the things
    log.setLevel(logging.DEBUG)
    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)

    ch.setFormatter(formatter)
    log.addHandler(ch)
    main()

Another update: I've found it dies because it reads all the lines from the header of the client and then times out when it runs out of lines. I think, the real answer I'm looking for is how to detect client disconnects when you don't actually need to receive data from the client (other then the initial connection).

like image 400
KDM Avatar asked Nov 17 '14 03:11

KDM


3 Answers

Ok, I think I understand your issue. You're reading from the client just to find out if the client has disconnected, but once the client has sent its header, the readline() will block indefinitely while the client is still connected, which prevents you from actually doing any work. Using a timeout to avoid the blocking is fine, you just need to handle the TimeoutError, since when it occurs you can just assume the client hasn't disconnected:

from concurrent.futures import TimeoutError

@asyncio.coroutine
def handle_client(client_reader, client_writer):
    data = {'result':{'status':'Connection Ready'}}
    postmessage(data,client_writer)
    count = 0
    while True:
        try:
            # See if client has disconnected.
            data = (yield from asyncio.wait_for(client_reader.readline(),timeout=0.01))
            if not data: # Client disconnected
                break
        except TimeoutError:
            pass  # Client hasn't disconnected.

        data = yield from asyncio.wait_for(test1(),timeout=1.0)
        yield from postmessage(data,client_writer)

        data = yield from asyncio.wait_for(test2(),timeout=1.0)
        yield from postmessage(data,client_writer)

Note that I made the timeout very short here, since we really don't want to block at all, we just want to know if the connection has closed.

But an even better solution would be to not explicitly check to see if the connection has closed, and instead handle whatever exception you get when you try sending data over the socket when the connection has been closed:

@asyncio.coroutine
def handle_client(client_reader, client_writer):
    data = {'result':{'status':'Connection Ready'}}
    postmessage(data,client_writer)
    count = 0
    while True:
        try:
            data = yield from asyncio.wait_for(test1(),timeout=1.0)
            yield from postmessage(data,client_writer)

            data = yield from asyncio.wait_for(test2(),timeout=1.0)
            yield from postmessage(data,client_writer)
        except ConnectionResetError:  # And/or whatever other exceptions you see.
            break
like image 125
dano Avatar answered Nov 10 '22 13:11

dano


I read the source code and found a easy way to do this:

if client_writer.transport._conn_lost:
    print('Connection is lost')
    # break some loop
like image 26
Gerald Avatar answered Nov 10 '22 13:11

Gerald


I had the same problem with select() which is also unable to detect a dead socket while waiting for an event.

I solved my problem using a non-blocking read() and considering that a read event yield from select(), mapped to 0 bytes received, is a dead conection.

The death of a socket, while executing code in kernel mode (sys call), doesn't generate a signal, nor any exception. You seem to be left using a heuristic approach, considering that a yield from a read() of zero bytes, should be considered as a dead link/socket... Not what I consider as elegant, but I haven't been able to find anything better (for the moment).

like image 2
dcexcal Avatar answered Nov 10 '22 13:11

dcexcal