Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Asyncio decode utf-8 with StreamReader

I am getting used to asyncio and find the task handling quite nice, but it can be difficult to mix async libraries with traditional io libraries. The problem I am currently facing is how to properly decode an async StreamReader.

The simplest solution is to read() chunks of byte strings, and then decode each chunk - see code below. (In my program, I wouldn't print each chunk, but decode it into a string and send it into another method for processing):

import asyncio
import aiohttp

async def get_data(port):
    url = 'http://localhost:{}/'.format(port)
    r = await aiohttp.get(url)
    stream = r.content
    while not stream.at_eof():
        data = await stream.read(4)
        print(data.decode('utf-8'))

This works fine, until there is a utf-8 character that is split between too chunks. For example if the response is b'M\xc3\xa4dchen mit Bi\xc3\x9f\n', then reading chunks of 3 will work, but chunks of 4 will not (as \xc3 and \x9f are in different chunks and decoding the chunk ending with \xc3 will raise the following error:

UnicodeDecodeError: 'utf-8' codec can't decode byte 0xc3 in position 3: unexpected end of data

I looked at proper solutions to this problem, and at least in the blocking world, seems to be either io.TextIOWrapper or codecs.StreamReaderWriter (the differences of which are discussed in PEP 0400). However, both of these rely on typical blocking streams.

I spent 30 minutes searching for examples with asyncio and kept finding my decode() solution. Does anyone know of a better solution or is this a missing feature in python's asyncio?

For reference, here are the results from using the two "standard" decoders with async streams.

Using the codec stream reader:

r = yield from aiohttp.get(url)
decoder = codecs.getreader('utf-8')
stream = decoder(r.content)

Exception:

File "echo_client.py", line 13, in get_data
  data = yield from stream.read(4)
File "/usr/lib/python3.5/codecs.py", line 497, in read
  data = self.bytebuffer + newdata
TypeError: can't concat bytes to generator

(it calls read() directly, rather than yield from or await it)

I also tried wrapping stream with io.TextIOWrapper:

stream = TextIOWrapper(r.content)

But that leads to the following:

File "echo_client.py", line 10, in get_data
  stream = TextIOWrapper(r.content)
AttributeError: 'FlowControlStreamReader' object has no attribute 'readable'

P.S. If you want a sample test case for this, please look at this gist. You can run it with python3.5 to reproduce the error. If you change the chunk size from 4 to 3 (or 30), it will work correctly.

EDIT

The accepted answer fixed this like a charm. Thanks! If someone else has this issue, here is a simple wrapper class I made to handle the decoding on a StreamReader:

import codecs

class DecodingStreamReader:
    def __init__(self, stream, encoding='utf-8', errors='strict'):
        self.stream = stream
        self.decoder = codecs.getincrementaldecoder(encoding)(errors=errors)

    async def read(self, n=-1):
        data = await self.stream.read(n)
        if isinstance(data, (bytes, bytearray)):
            data = self.decoder.decode(data)
        return data

    def at_eof(self):
        return self.stream.at_eof() 
like image 356
Ethan Frey Avatar asked Jan 27 '16 12:01

Ethan Frey


1 Answers

You can use an IncrementalDecoder:

Utf8Decoder = codecs.getincrementaldecoder('utf-8')

With your example:

decoder = Utf8Decoder(error='strict')
while not stream.at_eof():
    data = await stream.read(4)
    print(decoder.decode(data), end='')

Output:

Mädchen mit Biß
like image 177
Vincent Avatar answered Oct 18 '22 04:10

Vincent