Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reading streaming http response with Python "requests" library

I am trying to consume an event stream provided by the Kubernetes api using the requests module. I have run into what looks like a buffering problem: the requests module seems to lag by one event.

I have code that looks something like this:

r = requests.get('http://localhost:8080/api/v1beta1/watch/services',
                 stream=True)

for line in r.iter_lines():
    print 'LINE:', line

As Kubernetes emits event notifications, this code will only display the last event emitted when a new event comes in, which makes it almost completely useless for code that needs to respond to service add/delete events.

I have solved this by spawning curl in a subprocess instead of using the requests library:

p = subprocess.Popen(['curl', '-sfN',
                      'http://localhost:8080/api/watch/services'],
                     stdout=subprocess.PIPE,
                     bufsize=1)

for line in iter(p.stdout.readline, b''):
    print 'LINE:', line

This works, but at the expense of some flexibility. Is there a way to avoid this buffering problem with the requests library?

like image 569
larsks Avatar asked Jan 25 '15 16:01

larsks


1 Answers

This behavior is due to a buggy implementation of the iter_lines method in the requests library.

iter_lines iterates over the response content in chunk_size blocks of data using the iter_content iterator. If there are less than chunk_size bytes of data available for reading from the remote server (which will typically be the case when reading the last line of output), the read operation will block until chunk_size bytes of data are available.

I have written my own iter_lines routine that operates correctly:

import os


def iter_lines(fd, chunk_size=1024):
    '''Iterates over the content of a file-like object line-by-line.'''

    pending = None

    while True:
        chunk = os.read(fd.fileno(), chunk_size)
        if not chunk:
            break

        if pending is not None:
            chunk = pending + chunk
            pending = None

        lines = chunk.splitlines()

        if lines and lines[-1]:
            pending = lines.pop()

        for line in lines:
            yield line

    if pending:
        yield(pending)

This works because os.read will return less than chunk_size bytes of data rather than waiting for a buffer to fill.

like image 108
larsks Avatar answered Sep 29 '22 04:09

larsks