Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Wrap an io.BufferedIOBase such that it becomes seek-able

I was trying to craft a response to a question about streaming audio from a HTTP server, then play it with PyGame. I had the code mostly complete, but hit an error where the PyGame music functions tried to seek() on the urllib.HTTPResponse object.

According to the urlib docs, the urllib.HTTPResponse object (since v3.5) is an io.BufferedIOBase. I expected this would make the stream seek()able, however it does not.

Is there a way to wrap the io.BufferedIOBase such that it is smart enough to buffer enough data to handle the seek operation?

import pygame
import urllib.request
import io

# Window size
WINDOW_WIDTH  = 400
WINDOW_HEIGHT = 400
# background colour
SKY_BLUE      = (161, 255, 254)

### Begin the streaming of a file
### Return the urlib.HTTPResponse, a file-like-object
def openURL( url ):
    result = None

    try:
        http_response = urllib.request.urlopen( url )
        print( "streamHTTP() - Fetching URL [%s]" % ( http_response.geturl() ) )
        print( "streamHTTP() - Response Status [%d] / [%s]" % ( http_response.status, http_response.reason ) )
        result = http_response
    except:
        print( "streamHTTP() - Error Fetching URL [%s]" % ( url ) )

    return result


### MAIN
pygame.init()
window  = pygame.display.set_mode( ( WINDOW_WIDTH, WINDOW_HEIGHT ) )
pygame.display.set_caption("Music Streamer")


clock = pygame.time.Clock()
done = False
while not done:

    # Handle user-input
    for event in pygame.event.get():
        if ( event.type == pygame.QUIT ):
            done = True
    # Keys
    keys = pygame.key.get_pressed()
    if ( keys[pygame.K_UP] ):
        if ( pygame.mixer.music.get_busy() ):
            print("busy")
        else:
            print("play")
            remote_music = openURL( 'http://127.0.0.1/example.wav' )
            if ( remote_music != None and remote_music.status == 200 ):
                pygame.mixer.music.load( io.BufferedReader( remote_music ) )
                pygame.mixer.music.play()

    # Re-draw the screen
    window.fill( SKY_BLUE )

    # Update the window, but not more than 60fps
    pygame.display.flip()
    clock.tick_busy_loop( 60 )

pygame.quit()

When this code runs, and Up is pushed, it fails with the error:

streamHTTP() - Fetching URL [http://127.0.0.1/example.wav]
streamHTTP() - Response Status [200] / [OK]
io.UnsupportedOperation: seek
io.UnsupportedOperation: File or stream is not seekable.
io.UnsupportedOperation: seek
io.UnsupportedOperation: File or stream is not seekable.
Traceback (most recent call last):
  File "./sound_stream.py", line 57, in <module>
    pygame.mixer.music.load( io.BufferedReader( remote_music ) )
pygame.error: Unknown WAVE format

I also tried re-opening the the io stream, and various other re-implementations of the same sort of thing.

like image 510
Kingsley Avatar asked Nov 04 '19 23:11

Kingsley


People also ask

What is text io wrapper?

TextIOWrapper class TextIOWrapper provides methods and attributes which helps us to read or write data to and from the file. The following table lists some commonly used methods of _io. TextIOWrapper class. Reads the specified number of characters from the file and returns them as string.

What is io BytesIO ()?

StringIO and BytesIO are methods that manipulate string and bytes data in memory. StringIO is used for string data and BytesIO is used for binary data. This classes create file like object that operate on string data. The StringIO and BytesIO classes are most useful in scenarios where you need to mimic a normal file.

What is io module in Python?

The io module provides Python's main facilities for dealing with various types of I/O. There are three main types of I/O: text I/O, binary I/O and raw I/O. These are generic categories, and various backing stores can be used for each of them.


1 Answers

Seeking seeking

According to the urlib docs, the urllib.HTTPResponse object (since v3.5) is an io.BufferedIOBase. I expected this would make the stream seek()able, however it does not.

That's correct. The io.BufferedIOBase interface doesn't guarantee the I/O object is seekable. For HTTPResponse objects, IOBase.seekable() returns False:

>>> import urllib.request
>>> response = urllib.request.urlopen("http://httpbin.org/get")
>>> response
<http.client.HTTPResponse object at 0x110870ca0>
>>> response.seekable()
False

That's because the BufferedIOBase implementation offered by HTTPResponse is wrapping a socket object, and sockets are not seekable either.

You can't wrap an BufferedIOBase object in a BufferedReader object and add seeking support. The Buffered* wrapper objects can only wrap RawIOBase types, and they rely on the wrapped object to provide seeking support. You would have to emulate seeking at raw I/O level, see below.

You can still provide the same functionality at a higher level, but take into account that seeking on remote data is a lot more involved; this isn't a simple change a simple OS variable that represents a file position on disk operation. For larger remote file data, seeking without backing the whole file on disk locally could be as sophisticated as using HTTP range requests and local (in memory or on-disk) buffers to balance sound play-back performance and minimising local data storage. Doing this correctly for a wide range of use-cases can be a lot of effort, so is certainly not part of the Python standard library.

If your sound files are small

If your HTTP-sourced sound files are small enough (a few MB at most) then just read the whole response into an in-memory io.BytesIO() file object. I really do not think it is worth making this more complicated than that, because the moment you have enough data to make that worth pursuing your files are large enough to take up too much memory!

So this would be more than enough if your sound files are smaller (no more than a few MB):

from io import BytesIO
import urllib.error
import urllib.request

def open_url(url):
    try:
        http_response = urllib.request.urlopen(url)
        print(f"streamHTTP() - Fetching URL [{http_response.geturl()}]")
        print(f"streamHTTP() - Response Status [{http_response.status}] / [{http_response.reason}]")
    except urllib.error.URLError:
        print("streamHTTP() - Error Fetching URL [{url}]")
        return

    if http_response.status != 200:
        print("streamHTTP() - Error Fetching URL [{url}]")
        return

    return BytesIO(http_response.read())

This doesn't require writing a wrapper object, and because BytesIO is a native implementation, once the data is fully copied over, access to the data is faster than any Python-code wrapper could ever give you.

Note that this returns a BytesIO file object, so you no longer need to test for the response status:

remote_music = open_url('http://127.0.0.1/example.wav')
if remote_music is not None:
    pygame.mixer.music.load(remote_music)
    pygame.mixer.music.play()

If they are more than a few MB

Once you go beyond a few megabytes, you could try pre-loading the data into a local file object. You can make this more sophisticated by using a thread to have shutil.copyfileobj() copy most of the data into that file in the background and give the file to PyGame after loading just an initial amount of data.

By using an actual file object, you can actually help performance here, as PyGame will try to minimize interjecting itself between the SDL mixer and the file data. If there is an actual file on disk with a file number (the OS-level identifier for a stream, something that the SDL mixer library can make use of), then PyGame will operate directly on that and so minimize blocking the GIL (which in turn will help the Python portions of your game perform better!). And if you pass in a filename (just a string), then PyGame gets out of the way entirely and leaves all file operations over to the SDL library.

Here's such an implementation; this should, on normal Python interpreter exit, clean up the downloaded files automatically. It returns a filename for PyGame to work on, and finalizing downloading the data is done in a thread after the initial few KB has been buffered. It will avoid loading the same URL more than once, and I've made it thread-safe:

import shutil
import urllib.error
import urllib.request
from tempfile import NamedTemporaryFile
from threading import Lock, Thread

INITIAL_BUFFER = 1024 * 8  # 8kb initial file read to start URL-backed files
_url_files_lock = Lock()
# stores open NamedTemporaryFile objects, keeping them 'alive'
# removing entries from here causes the file data to be deleted.
_url_files = {}


def open_url(url):
    with _url_files_lock:
        if url in _url_files:
            return _url_files[url].name

    try:
        http_response = urllib.request.urlopen(url)
        print(f"streamHTTP() - Fetching URL [{http_response.geturl()}]")
        print(f"streamHTTP() - Response Status [{http_response.status}] / [{http_response.reason}]")
    except urllib.error.URLError:
        print("streamHTTP() - Error Fetching URL [{url}]")
        return

    if http_response.status != 200:
        print("streamHTTP() - Error Fetching URL [{url}]")
        return

    fileobj = NamedTemporaryFile()

    content_length = http_response.getheader("Content-Length")
    if content_length is not None:
        try:
            content_length = int(content_length)
        except ValueError:
            content_length = None
        if content_length:
            # create sparse file of full length
            fileobj.seek(content_length - 1)
            fileobj.write(b"\0")
            fileobj.seek(0)

    fileobj.write(http_response.read(INITIAL_BUFFER))
    with _url_files_lock:
        if url in _url_files:
            # another thread raced us to this point, we lost, return their
            # result after cleaning up here
            fileobj.close()
            http_response.close()
            return _url_files[url].name

        # store the file object for this URL; this keeps the file
        # open and so readable if you have the filename.
        _url_files[url] = fileobj

    def copy_response_remainder():
        # copies file data from response to disk, for all data past INITIAL_BUFFER
        with http_response:
            shutil.copyfileobj(http_response, fileobj)

    t = Thread(daemon=True, target=copy_response_remainder)
    t.start()

    return fileobj.name

Like the BytesIO() solution, the above returns either None or a value ready for passing to pass to pygame.mixer.music.load().

The above will probably not work if you try to immediately set an advanced playing position in your sound files, as later data may not yet have been copied into the file. It's a trade-off.

Seeking and finding third party libraries

If you need to have full seeking support on remote URLs and don't want to use on-disk space for them and don't want to have to worry about their size, you don't need to re-invent the HTTP-as-seekable-file wheel here. You could use an existing project that offers the same functionality. I found two that offer io.BufferedIOBase-based implementations:

  • smart_open
  • httpio

Both use HTTP Range requests to implement seeking support. Just use httpio.open(URL) or smart_open.open(URL) and pass that directly to pygame.mixer.music.load(); if the URL can't be opened, you can catch that by handling the IOError exception:

from smart_open import open as url_open  # or from httpio import open

try:
    remote_music = url_open('http://127.0.0.1/example.wav')
except IOError:
    pass
else:
    pygame.mixer.music.load(remote_music)
    pygame.mixer.music.play()

smart_open uses an in-memory buffer to satisfy reads of a fixed size, but creates a new HTTP Range request for every call to seek that changes the current file position, so performance may vary. Since the SDL mixer executes a few seeks on audio files to determine their type, I expect this to be a little slower.

httpio can buffer blocks of data and so might handle seeks better, but from a brief glance at the source code, when actually setting a buffer size the cached blocks are never evicted from memory again so you'd end up with the whole file in memory, eventually.

Implementing seeking ourselves, via io.RawIOBase

And finally, because I'm not able to find efficient HTTP-Range-backed I/O implementations, I wrote my own. The following implements the io.RawIOBase interface, specifically so you can then wrap the object in a io.BufferedIOReader() and so delegate caching to a caching buffer that will be managed correctly when seeking:

import io
from copy import deepcopy
from functools import wraps
from typing import cast, overload, Callable, Optional, Tuple, TypeVar, Union
from urllib.request import urlopen, Request

T = TypeVar("T")

@overload
def _check_closed(_f: T) -> T: ...
@overload
def _check_closed(*, connect: bool, default: Union[bytes, int]) -> Callable[[T], T]: ...

def _check_closed(
    _f: Optional[T] = None,
    *,
    connect: bool = False,
    default: Optional[Union[bytes, int]] = None,
) -> Union[T, Callable[[T], T]]:
    def decorator(f: T) -> T:
        @wraps(cast(Callable, f))
        def wrapper(self, *args, **kwargs):
            if self.closed:
                raise ValueError("I/O operation on closed file.")
            if connect and self._fp is None or self._fp.closed:
                self._connect()
                if self._fp is None:
                    # outside the seekable range, exit early
                    return default
            try:
                return f(self, *args, **kwargs)
            except Exception:
                self.close()
                raise
            finally:
                if self._range_end and self._pos >= self._range_end:
                    self._fp.close()
                    del self._fp

        return cast(T, wrapper)

    if _f is not None:
        return decorator(_f)

    return decorator

def _parse_content_range(
    content_range: str
) -> Tuple[Optional[int], Optional[int], Optional[int]]:
    """Parse a Content-Range header into a (start, end, length) tuple"""
    units, *range_spec = content_range.split(None, 1)
    if units != "bytes" or not range_spec:
        return (None, None, None)
    start_end, _, size = range_spec[0].partition("/")
    try:
        length: Optional[int] = int(size)
    except ValueError:
        length = None
    start_val, has_start_end, end_val = start_end.partition("-")
    start = end = None
    if has_start_end:
        try:
            start, end = int(start_val), int(end_val)
        except ValueError:
            pass
    return (start, end, length)

class HTTPRawIO(io.RawIOBase):
    """Wrap a HTTP socket to handle seeking via HTTP Range"""

    url: str
    closed: bool = False
    _pos: int = 0
    _size: Optional[int] = None
    _range_end: Optional[int] = None
    _fp: Optional[io.RawIOBase] = None

    def __init__(self, url_or_request: Union[Request, str]) -> None:
        if isinstance(url_or_request, str):
            self._request = Request(url_or_request)
        else:
            # copy request objects to avoid sharing state
            self._request = deepcopy(url_or_request)
        self.url = self._request.full_url
        self._connect(initial=True)

    def readable(self) -> bool:
        return True

    def seekable(self) -> bool:
        return True

    def close(self) -> None:
        if self.closed:
            return
        if self._fp:
            self._fp.close()
            del self._fp
        self.closed = True

    @_check_closed
    def tell(self) -> int:
        return self._pos

    def _connect(self, initial: bool = False) -> None:
        if self._fp is not None:
            self._fp.close()
        if self._size is not None and self._pos >= self._size:
            # can't read past the end
            return
        request = self._request
        request.add_unredirected_header("Range", f"bytes={self._pos}-")
        response = urlopen(request)

        self.url = response.geturl()  # could have been redirected
        if response.status not in (200, 206):
            raise OSError(
                f"Failed to open {self.url}: "
                f"{response.status} ({response.reason})"
            )

        if initial:
            # verify that the server supports range requests. Capture the
            # content length if available
            if response.getheader("Accept-Ranges") != "bytes":
                raise OSError(
                    f"Resource doesn't support range requests: {self.url}"
                )
            try:
                length = int(response.getheader("Content-Length", ""))
                if length >= 0:
                    self._size = length
            except ValueError:
                pass

        # validate the range we are being served
        start, end, length = _parse_content_range(
            response.getheader("Content-Range", "")
        )
        if self._size is None:
            self._size = length
        if (start is not None and start != self._pos) or (
            length is not None and length != self._size
        ):
            # non-sensical range response
            raise OSError(
                f"Resource at {self.url} served invalid range: pos is "
                f"{self._pos}, range {start}-{end}/{length}"
            )
        if self._size and end is not None and end + 1 < self._size:
            # incomplete range, not reaching all the way to the end
            self._range_end = end
        else:
            self._range_end = None

        fp = cast(io.BufferedIOBase, response.fp)  # typeshed doesn't name fp
        self._fp = fp.detach()  # assume responsibility for the raw socket IO

    @_check_closed
    def seek(self, offset: int, whence: int = io.SEEK_SET) -> int:
        relative_to = {
            io.SEEK_SET: 0,
            io.SEEK_CUR: self._pos,
            io.SEEK_END: self._size,
        }.get(whence)
        if relative_to is None:
            if whence == io.SEEK_END:
                raise IOError(
                    f"Can't seek from end on unsized resource {self.url}"
                )
            raise ValueError(f"whence value {whence} unsupported")
        if -offset > relative_to:  # can't seek to a point before the start
            raise OSError(22, "Invalid argument")

        self._pos = relative_to + offset
        # there is no point in optimising an existing connection
        # by reading from it if seeking forward below some threshold.
        # Use a BufferedIOReader to avoid seeking by small amounts or by 0
        if self._fp:
            self._fp.close()
            del self._fp
        return self._pos

    # all read* methods delegate to the SocketIO object (itself a RawIO
    # implementation).

    @_check_closed(connect=True, default=b"")
    def read(self, size: int = -1) -> Optional[bytes]:
        assert self._fp is not None  # show type checkers we already checked
        res = self._fp.read(size)
        if res is not None:
            self._pos += len(res)
        return res

    @_check_closed(connect=True, default=b"")
    def readall(self) -> bytes:
        assert self._fp is not None  # show type checkers we already checked
        res = self._fp.readall()
        self._pos += len(res)
        return res

    @_check_closed(connect=True, default=0)
    def readinto(self, buffer: bytearray) -> Optional[int]:
        assert self._fp is not None  # show type checkers we already checked
        n = self._fp.readinto(buffer)
        self._pos += n or 0
        return n

Remember that this is a RawIOBase object, which you really want to wrap in a BufferReader(). Doing so in open_url() looks like this:

def open_url(url, *args, **kwargs):
    return io.BufferedReader(HTTPRawIO(url), *args, **kwargs)

This gives you fully buffered I/O, with full support seeking, over a remote URL, and the BufferedReader implementation will minimise resetting the HTTP connection when seeking. I've found that using this with the PyGame mixer, only single HTTP connection is made, as all the test seeks are within the default 8KB buffer.

like image 84
Martijn Pieters Avatar answered Dec 08 '22 19:12

Martijn Pieters