Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Stream a non-seekable file-like object to multiple sinks

I have a non-seekable file-like object. In particular it is a file of indeterminate size coming from an HTTP request.

import requests
fileobj = requests.get(url, stream=True)

I am streaming this file to a call to an Amazon AWS SDK function which is writing the contents to Amazon S3. This is working fine.

import boto3
s3 = boto3.resource('s3')
s3.bucket('my-bucket').upload_fileobj(fileobj, 'target-file-name')

However, at the same time as streaming it to S3 I want to also stream the data to another process. This other process may not need the entire stream and might stop listening at some point; this is fine and should not affect the stream to S3.

It's important I don't use too much memory, since some of these files could be enormous. I don't want to write anything to disk for the same reason.

I don't mind if either sink is slowed down due to the other being slow, as long as S3 eventually gets the entire file, and the data goes to both sinks (rather, to each one which still wants it).

What's the best way to go about this in Python (3)? I know I can't just pass the same file object to both sinks, such as

s3.bucket('my-bucket').upload_fileobj(fileobj, 'target-file-name')
# At the same time somehow as
process = subprocess.Popen(['myapp'], stdin=fileobj)

I think I could write a wrapper for the file-like object which passes any data read not only to the caller (which would be the S3 sink) but also to the other process. Something like

class MyFilewrapper(object):
    def __init__(self, fileobj):
        self._fileobj = fileobj
        self._process = subprocess.Popen(['myapp'], stdin=popen.PIPE)
    def read(self, size=-1):
        data = self._fileobj.read(size)
        self._process.stdin.write(data)
        return data

filewrapper = MyFilewrapper(fileobj)
s3.bucket('my-bucket').upload_fileobj(filewrapper, 'target-file-name')

But is there a better way to do it? Perhaps something like

streams = StreamDuplicator(fileobj, streams=2)
s3.bucket('my-bucket').upload_fileobj(streams[0], 'target-file-name')
# At the same time somehow as
process = subprocess.Popen(['myapp'], stdin=streams[1])
like image 254
tremby Avatar asked Sep 05 '16 04:09

tremby


2 Answers

The discomfort regarding your MyFilewrapper solution arises, because the IO loop inside upload_fileobj is now in control of feeding the data to a subprocess that is strictly speaking unrelated to the upload.

A "proper" solution would involve an upload API that provides a file-like object for writing the upload stream with an outside loop. That would allow you to feed the data to both target streams "cleanly".

The following example shows the basic concept. The fictional startupload method provides the file-like object for uploading. Of cource you would need to add proper error handling etc.

fileobj = requests.get(url, stream=True)

upload_fd = s3.bucket('my-bucket').startupload('target-file-name')
other_fd = ... # Popen or whatever

buf = memoryview(bytearray(4046))
while True:
    r = fileobj.read_into(buf)
    if r == 0:
        break

    read_slice = buf[:r]
    upload_fd.write(read_slice)
    other_fd.write(read_slice)
like image 112
code_onkel Avatar answered Nov 04 '22 18:11

code_onkel


Here is an implementation of StreamDuplicator with requested functionality and use model. I verified that it handles correctly the case when one of the sinks stops consuming the respective stream half-way.

Usage:

./streamduplicator.py <sink1_command> <sink2_command> ...

Example:

$ seq 100000 | ./streamduplicator.py "sed -n '/0000/ {s/^/sed: /;p}'" "grep 1234"

Output:

sed: 10000
1234
11234
12340
12341
12342
12343
12344
12345
12346
12347
12348
12349
21234
sed: 20000
31234
sed: 30000
41234
sed: 40000
51234
sed: 50000
61234
sed: 60000
71234
sed: 70000
81234
sed: 80000
91234
sed: 90000
sed: 100000

streamduplicator.py:

#!/usr/bin/env python3

import sys
import os
from subprocess import Popen
from threading import Thread
from time import sleep
import shlex
import fcntl

WRITE_TIMEOUT=0.1

def write_or_timeout(stream, data, timeout):
    data_to_write = data[:]
    time_to_sleep = 1e-6
    time_remaining = 1.0 * timeout
    while time_to_sleep != 0:
        try:
            stream.write(data_to_write)
            return True
        except BlockingIOError as ex:
            data_to_write = data_to_write[ex.characters_written:]
            if ex.characters_written == 0:
                time_to_sleep *= 2
            else:
                time_to_sleep = 1e-6
                time_remaining = timeout
        time_to_sleep = min(time_remaining, time_to_sleep)
        sleep(time_to_sleep)
        time_remaining -= time_to_sleep

    return False


class StreamDuplicator(object):

    def __init__(self, stream, n, timeout=WRITE_TIMEOUT):
        self.stream = stream
        self.write_timeout = timeout
        self.pipereadstreams = []
        self.pipewritestreams = []
        for i in range(n):
            (r, w) = os.pipe()
            readStream = open(r, 'rb')
            self.pipereadstreams.append(readStream)
            old_flags = fcntl.fcntl(w, fcntl.F_GETFL);
            fcntl.fcntl(w, fcntl.F_SETFL, old_flags|os.O_NONBLOCK)
            self.pipewritestreams.append(os.fdopen(w, 'wb'))

        Thread(target=self).start()

    def __call__(self):
        while True:
            data = self.stream.read(1024*16)

            if len(data) == 0:
                break

            surviving_pipes = []
            for p in self.pipewritestreams:
                if write_or_timeout(p, data, self.write_timeout) == True:
                    surviving_pipes.append(p)
            self.pipewritestreams = surviving_pipes


    def __getitem__(self, i):
        return self.pipereadstreams[i]


if __name__ == '__main__':
    n = len(sys.argv)
    streams = StreamDuplicator(sys.stdin.buffer, n-1, 3)
    for (i,cmd) in zip(range(n-1), sys.argv[1:]):
        Popen(shlex.split(cmd), stdin=streams[i])

Implementation limitations:

  • usage of fcntl to set a pipe writing file descriptor to non-blocking mode probably makes it unusable under Windows.

  • a closed/unsubscribed sink is detected through a write timeout.

like image 29
Leon Avatar answered Nov 04 '22 18:11

Leon