Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use io to generate in memory data streams as file like objects?

I like to generate a in-memory (temp file) data stream in Python. One thread is filling the stream with data, and a other one consumes it.

After checking the io - Core tools for working with streams , it seems to me that the io module is the best choice for it.

So I put a simple example for me:

#!/usr/local/bin/python3
# encoding: utf-8

import io

if __name__ == '__main__':
    a = io.BytesIO()
    a.write("hello".encode())
    txt = a.read(100)
    txt = txt.decode("utf-8")
    print(txt) 

My example does not work. "hello" is not written to a and can not be read after. So were is my error? How do I have to alter my code to get a file like object in memory?

like image 494
Stefan Jaritz Avatar asked Sep 05 '18 10:09

Stefan Jaritz


Video Answer


1 Answers

@dhilmathy and @ShadowRanger mentioned that io.BytesIO() do not have separate read and write pointer.

I overcome this problem with creating a simple class that implements a read pointer and remembers the amount of bytes written. When the amount of read bytes is equal the amount of written bytes the file is shrink to save memory.

My solution so far:

#!/usr/local/bin/python3
# encoding: utf-8

import io

class memoryStreamIO(io.BytesIO):
    """
    memoryStreamIO

    a in memory file like stream object 
    """

    def __init__(self):
        super().__init__()
        self._wIndex = 0
        self._rIndex = 0
        self._mutex = threading.Lock()

    def write(self, d : bytearray):
        self._mutex.acquire()
        r = super().write(d)
        self._wIndex += len(d)
        self._mutex.release()
        return r

    def read(self, n : int):
        self._mutex.acquire()
        super().seek(self._rIndex)
        r = super().read(n)
        self._rIndex += len(r)
        # now we are checking if we can
        if self._rIndex == self._wIndex:
            super().truncate(0)
            super().seek(0)
            self._rIndex = 0
            self._wIndex = 0
        self._mutex.release()
        return r

    def seek(self, n):
        self._mutex.acquire()
        self._rIndex = n
        r = super().seek(n)
        self._mutex.release()
        return r


if __name__ == '__main__':
    a = streamIO()

    a.write("hello".encode())
    txt = (a.read(100)).decode()
    print(txt)

    a.write("abc".encode())
    txt = (a.read(100)).decode()
    print(txt)
like image 139
Stefan Jaritz Avatar answered Oct 06 '22 00:10

Stefan Jaritz