Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python: Reading long lines with asyncio.StreamReader.readline()

The asyncio version of readline() (1) allows reading a single line from a stream asynchronously. However, if it encounters a line that is longer than a limit, it will raise an exception (2). It is unclear how to resume reading after such an exception is raised.

I would like a function similar to readline(), that simply discards parts of lines that exceed the limit, and continues reading until the stream ends. Does such a method exist? If not, how to write one?

1: https://docs.python.org/3/library/asyncio-stream.html#streamreader

2: https://github.com/python/cpython/blob/3.12/Lib/asyncio/streams.py#L549

like image 713
static_rtti Avatar asked Oct 20 '25 11:10

static_rtti


2 Answers

The standard library implementation of asyncio.StreamReader.readline is almost what you want. That function raises a ValueError in the event of a buffer overrun, but it first removes a chunk of data from the buffer and discards it. If you catch the ValueError and keep calling readline, the stream would keep going. But you would lose the first chunk of data, which is not what you want.

To achieve what you want you can call a slightly lower-level function StreamReader.readuntil(separator). If you call reader.readuntil('\n') it's equivalent to reader.readline. On an overrun this function raises a asyncio.LimitOverrunError, which has an instance variable named consumed. This is an integer representing the current length of the buffer contents.

There are a couple of strange details here. Despite the variable's name, these bytes have not been "consumed" - they are still in the buffer. In order to keep the stream going you have to consume them yourself, which you can do by calling the function StreamReader.readexactly. The second strange thing is that the value of LimitOverrunError.consumed can be greater than the value of the keyword argument limit= that you originally passed to the open_connection constructor. That argument was supposed to set the size of the input buffer but it appears that it merely sets the threshold for raising the LimitOverrunError. Consequently, the buffer might contain the separator already, but fortunately the character count in LimitOverrunError.consumed does not include the separator. So a call to readexactly will not consume the separator.

You can achieve exactly what you want by calling readuntil, with added logic to handle LimitOverrunError. In the exception handler you save the first chunk of data and continue to read from the stream. When you finally encounter the end of the line you return the first chunk that you saved.

Here is a full working demo program. To test the code I set limit=512 in asyncio.open_connection. The server transmits a line that's longer than this followed by some shorter lines. The client reads each of the lines without any exception being raised.

The function you are asking for is readline_no_limit.

import asyncio

_PORT = 54242

MY_LIMIT = 512

async def readline_no_limit(reader):
    """ Return a bytes object.  If there has not been a buffer
    overrun the returned value will end with include the line terminator,
    otherwise not.

    The length of the returned value may be greater than the limit
    specified in the original call to open_connection."""

    discard = False
    first_chunk = b''
    while True:
        try:
            chunk = await reader.readuntil(b'\n') 
            if not discard:
                return chunk
            break
        except asyncio.LimitOverrunError as e:
            print(f"Overrun detected, buffer length now={e.consumed}")
            chunk = await reader.readexactly(e.consumed)
            if not discard:
                first_chunk = chunk
            discard = True
    return first_chunk

async def client():
    await asyncio.sleep(1.0)
    reader, writer = await asyncio.open_connection(host='localhost', 
        port=_PORT, limit=MY_LIMIT)
    writer.write(b"Hello\n")
    while True:
        line = await readline_no_limit(reader)
        print(f"Received {len(line)} bytes, first 40: {line[:40]}")
        if line == b"End\n":
            break
    writer.write(b"Quit\n")

async def got_cnx(reader, writer):
    while True:
        msg = await reader.readline()
        if msg == b"Quit\n":
            break
        if msg != b"Hello\n":
            continue
        long_line = ' '.join([format(x, "x") for x in range(1000)])
        writer.write(bytes(long_line, encoding='utf8'))
        writer.write(b"\n")
        writer.write(b"Short line\n")
        writer.write(b"End\n")

async def main():
    def shutdown(_future):
        print("\nClient quit, server shutdown")
        server.close()
    client_task = asyncio.create_task(client())
    server = await asyncio.start_server(got_cnx, host='localhost', port=_PORT,
        start_serving = False)
    client_task.add_done_callback(shutdown)
    try:
        await server.serve_forever()
    except asyncio.CancelledError:
        print("Server closed normally")

if __name__ == "__main__":
    asyncio.run(main())

Output:

Overrun detected, buffer length now=3727
Received 3727 bytes, first 40: b'0 1 2 3 4 5 6 7 8 9 a b c d e f 10 11 12'
Received 11 bytes, first 40: b'Short line\n'
Received 4 bytes, first 40: b'End\n'

Client quit, server shutdown
Server closed normally

Python 3.11, Ubuntu

like image 148
Paul Cornelius Avatar answered Oct 23 '25 01:10

Paul Cornelius


In the asyncio.open_connection() code there is this comment: "If you want to customize the StreamReader ... just copy the code ...".


OK. Let's try. Copied and changed the StreamReader line:

import asyncio

_DEFAULT_LIMIT = 2 ** 16  # 64 KiB

async def open_trunc_connection(
        host=None, port=None, *, limit=_DEFAULT_LIMIT, **kwds):
    loop = asyncio.get_running_loop()
    reader = TruncatingStreamReader(limit=limit, loop=loop) # <-- own reader
    protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
    transport, _ = await loop.create_connection(
        lambda: protocol, host, port, **kwds)
    writer = asyncio.StreamWriter(transport, protocol, reader, loop)
    return reader, writer

Now the reader. The higher level first just for the discussion. Variable line is the read line possibly truncated. The drain boolean flag means a rest of a long line still exists and needs to be consumed.

This simple code has a drawback that a truncated line is not returned immediately, but only when the whole line is read up to the newline. If this is not desired, store the flag to self._drain (needs to be initialized in __init__) and do the "drain" on the next call.

class TruncatingStreamReader(asyncio.StreamReader):

    async def readline(self):
        line, drain = await self._truncating_readline()
        while drain:
            discard, drain = await self._truncating_readline()
        return line

The low level. Based on the standard asyncio code. See coments there if need be. I marked a section below with the comment "we have actually the whole line". This is the case when the line is in the buffer, but happens to be longer than the limit. You could return it untruncated, but it violates the length limit contract.

    async def _truncating_readline(self):
        sep = b'\n'
        seplen = len(sep) # i.e. 1
        try:
            line = await self.readuntil(sep)
            return (line, False)
        except asyncio.IncompleteReadError as e:
            line = e.partial
            return (line, False)
        except asyncio.LimitOverrunError as e:
            if self._buffer.startswith(sep, e.consumed):
                # we have actually the whole line ...
                line = bytes(self._buffer[:self._limit])
                del self._buffer[:e.consumed + seplen]
                self._maybe_resume_transport()
                return (line, False)
            line = bytes(self._buffer[:self._limit])
            self._buffer.clear()
            self._maybe_resume_transport()
            return (line, True)



A test program:

async def test():
    r, w = await open_trunc_connection(
        '127.0.0.1', '54321', limit=10)
    async for line in r:
        print(line)

if __name__ == "__main__":
    asyncio.run(test())

I made a test "server" with this linux command: ncat -l 54321 < file.txt. Must be restarted for each connection.

like image 34
VPfB Avatar answered Oct 23 '25 01:10

VPfB