Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Stream multiple files into a readable object in Python

I have a function which processes binary data from a file using file.read(len) method. However, my file is huge and is cut into many smaller files 50 MBytes each. Is there some wrapper class that feeds many files into a buffered stream, and provides a read() method?

Class fileinput.FileInput can do such a thing, but it supports only line-by-line reading (method readline() with no arguments) and does not have read(len) with specifying number of bytes to read.

like image 351
xivaxy Avatar asked Jul 02 '14 10:07

xivaxy


People also ask

How do I load multiple data files in Python?

The importFolder (R)/ import_file (Python) function can be used to import multiple local files by specifying a directory and a pattern. Example patterns include: pattern="/A/.

What is TextIOWrapper Python?

TextIOWrapper , which extends TextIOBase , is a buffered text interface to a buffered raw stream ( BufferedIOBase ). Finally, StringIO is an in-memory stream for text.

How do you create a file like an object in Python?

Whenever we open a file to perform any operations on it, Python returns a file object. To create a file object in Python use the built-in functions, such as open() and os. popen() .

What is a stream object Python?

(Python) stream. A stream is another name for a file object or file-like object, as described in the Python io module.


2 Answers

It's quite easy to concatenate iterables with itertools.chain:

from itertools import chain

def read_by_chunks(file_objects, block_size=1024):
    readers = (iter(lambda f=f: f.read(block_size), '') for f in file_objects)
    return chain.from_iterable(readers)

You can then do:

for chunk in read_by_chunks([f1, f2, f3, f4], 4096):
    handle(chunk)

To process the files in sequence while reading it by chunks of 4096 bytes.

If you need to provide an object with a read method because some other function expects that you can write a very simple wrapper:

class ConcatFiles(object):
    def __init__(self, files, block_size):
        self._reader = read_by_chunks(files, block_size)

    def __iter__(self):
        return self._reader

    def read(self):
        return next(self._reader, '')

This however only uses a fixed block size. It's possible to support the block_size parameter for the read by doing something like:

def read(self, block_size=None):
    block_size = block_size or self._block_size
    total_read = 0
    chunks = []

    for chunk in self._reader:
        chunks.append(chunk)
        total_read += len(chunk)
        if total_read > block_size:
            contents = ''.join(chunks)
            self._reader = chain([contents[block_size:]], self._reader)
            return contents[:block_size]
    return ''.join(chunks)

Note: if you are reading in binary mode you should replace the empty strings '' in the code with empty bytes b''.

like image 90
Bakuriu Avatar answered Nov 03 '22 00:11

Bakuriu


Instead of converting the list of streams into a generator - as some of the other answers do - you can chain the streams together and then use the file interface:

def chain_streams(streams, buffer_size=io.DEFAULT_BUFFER_SIZE):
    """
    Chain an iterable of streams together into a single buffered stream.
    Usage:
        def generate_open_file_streams():
            for file in filenames:
                yield open(file, 'rb')
        f = chain_streams(generate_open_file_streams())
        f.read()
    """

    class ChainStream(io.RawIOBase):
        def __init__(self):
            self.leftover = b''
            self.stream_iter = iter(streams)
            try:
                self.stream = next(self.stream_iter)
            except StopIteration:
                self.stream = None

        def readable(self):
            return True

        def _read_next_chunk(self, max_length):
            # Return 0 or more bytes from the current stream, first returning all
            # leftover bytes. If the stream is closed returns b''
            if self.leftover:
                return self.leftover
            elif self.stream is not None:
                return self.stream.read(max_length)
            else:
                return b''

        def readinto(self, b):
            buffer_length = len(b)
            chunk = self._read_next_chunk(buffer_length)
            while len(chunk) == 0:
                # move to next stream
                if self.stream is not None:
                    self.stream.close()
                try:
                    self.stream = next(self.stream_iter)
                    chunk = self._read_next_chunk(buffer_length)
                except StopIteration:
                    # No more streams to chain together
                    self.stream = None
                    return 0  # indicate EOF
            output, self.leftover = chunk[:buffer_length], chunk[buffer_length:]
            b[:len(output)] = output
            return len(output)

    return io.BufferedReader(ChainStream(), buffer_size=buffer_size)

Then use it as any other file/stream:

f = chain_streams(open_files_or_chunks)
f.read(len)
like image 23
Hardbyte Avatar answered Nov 02 '22 23:11

Hardbyte