Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using pipes to capture things printed to STDERR into Python variable from Jupyter

I'm trying to capture the things that are printed to STDERR when I run commands in jupyter notebook. In particular, I'm using TensorFlow, which does fprintf from the C parts, which is normally printed on console, but I want to save into Python variable.

I've been using FDRedirector from IPython codebase which sets up os.pipe to capture output into Python string.

However, the problem with this code is that it hangs the kernel for large enough output. I'm suspecting it will hang for output over 65k since that's pipe buffer size on Linux and gdb shows the hang is happening in write. Does anyone have a solution that would work with larger outputs?

As an example of what I'm doing right now, using FDRedirector

STDERR = 2
redirect = FDRedirector(STDERR)
import tensorflow as tf
sess = tf.Session("")
node = tf.Print(tf.constant(1), [tf.constant(1)], "longstringlongstring")
def print_to_stderr():
    sess.run(node)   # this prints to stderr
redirect.start();
print_to_stderr()
captured_stderr = redirect.stop()

At the end, "captured_stderr" contains all the things printed to stderr, including longstringlongstring. If you make longstring part much longer (>100k), this will freeze.

like image 550
Yaroslav Bulatov Avatar asked Dec 19 '16 05:12

Yaroslav Bulatov


3 Answers

This code tries to mimick Jupyter %%capture magic by returning an IPython.utils.capture.CapturedIO instance.

It makes use of tempfile.TemporaryFile to store the bytes written to stdout and stderr. The __init__ method makes sure sys.stdout is same as sys.__stdout__ within the with block. So does with sys.stderr. The __exit__ method first syncs data to the temporary files with os.fdatasync then reads bytes from temporary files and writes back to StringIO instance.

The returned IPython.utils.capture.CapturedIO instance sports stdout, stderr property, a show method and itself is a callable.

from io import StringIO
import os
import sys
from tempfile import TemporaryFile

from IPython.utils.capture import CapturedIO

class Capture(object):
    def __init__(self, stdout=True, stderr=True):
        self.old_stdout = self.old_stderr = self.stdout = self.stderr = None
        if stdout:
            stdout = StringIO()
            if hasattr(sys.__stdout__, 'fileno'):
                self.stdout = sys.__stdout__.fileno()
                # make sys.stdout and sys.__stdout__ same
                if sys.stdout != sys.__stdout__:
                    self.old_stdout = sys.stdout
                    sys.stdout = sys.__stdout__

            elif hasattr(sys.stdout, 'fileno'):
                self.stdout = sys.stdout.fileno()

            self.stdout_clone = os.dup(self.stdout)
            self.stdout_temp_file = TemporaryFile()

        if stderr:
            stderr = StringIO()
            if hasattr(sys.__stderr__, 'fileno'):
                self.stderr = sys.__stderr__.fileno()
                if sys.stderr != sys.__stderr__:
                    self.old_stderr = sys.stderr
                    sys.stderr = sys.__stderr__

            elif hasattr(sys.stderr, 'fileno'):
                self.stderr = sys.stderr.fileno()

            self.stderr_clone = os.dup(self.stderr)
            self.stderr_temp_file = TemporaryFile()

        self.output = CapturedIO(stdout, stderr)

    def __enter__(self):
        if self.stdout is not None:
            os.dup2(self.stdout_temp_file.fileno(), self.stdout)

        if self.stderr is not None:
            os.dup2(self.stderr_temp_file.fileno(), self.stderr)        

        return self.output

    def __exit__(self, exc_type, val, tb):
        if self.stdout is not None:
            # sync data only
            os.fdatasync(self.stdout)
            # if sys.stdout was replaced, restore original
            if self.old_stdout is not None:
                sys.stdout = self.old_stdout
            # bytes written
            nbytes = os.lseek(self.stdout, 0, os.SEEK_CUR)
            # rewind
            os.lseek(self.stdout, 0, os.SEEK_SET)
            # read from temp file and write to StringIO
            self.output._stdout.write(os.read(self.stdout, nbytes).decode('utf-8'))
            # restore file descriptors
            os.dup2(self.stdout_clone, self.stdout)
            os.close(self.stdout_clone)

        if self.stderr is not None:
            os.fdatasync(self.stderr)
            if self.old_stderr is not None:
                sys.stderr = self.old_stderr
            nbytes = os.lseek(self.stderr, 0, os.SEEK_CUR)
            os.lseek(self.stderr, 0, os.SEEK_SET)
            self.output._stderr.write(os.read(self.stderr, nbytes).decode('utf-8'))
            os.dup2(self.stderr_clone, self.stderr)
            os.close(self.stderr_clone)

        if exc_type is not None:
            return False 

Test;

import tensorflow as tf

sess = tf.Session("")
node = tf.Print(tf.constant(1), [tf.constant(1)], "a" * 1024 ** 2)

with Capture() as output:
    print(sess.run(node))

print('stdout: {}\nstderr: {}'.format(output.stdout, len(output.stderr)))

Output;

stdout: 1

stderr: 1048625
like image 123
Nizam Mohamed Avatar answered Sep 19 '22 17:09

Nizam Mohamed


You can try piping your output to a temp file - so no buffer limits:

STDERR=2
STDOUT=1
import os
import sys
import tempfile

class captured:
    def __init__(self, fd=STDERR):
        self.fd = fd
        self.prevfd = None

    def __enter__(self):
        t = tempfile.NamedTemporaryFile()
        print 'Piping your output to ' + t.name
        self.prevfd = os.dup(self.fd)
        os.dup2(t.fileno(), self.fd)
        return t

    def __exit__(self, exc_type, exc_value, traceback):
        os.dup2(self.prevfd, self.fd)

with captured(fd=STDOUT) as tmp:
    os.system('cat 1mbfile.txt');

print "Captured:", open(tmp.name).read()    

Please let me know if this works for your purpose. Unfortunately I don't have TF installed.

Jupyter itself survived outputting 1Mb to a cell :)

like image 45
Ivan Avatar answered Sep 20 '22 17:09

Ivan


The problem with FDRedirector is, that it reads only once from the pipe. As both, the producer and the consumer end of the pipe, live in the same process, this will block the writing end of the pipe once the buffer is full. The solution is to read continuously from the read end to not block the other side. One way to do so is spawning a consumer thread.

Here is a a piece of code that does just that: redirect stderr to a pipe and read continuously using a thread.

Update: The code is now for Python 3 as suggested by OP's comment. Assuming that you want to have the captured output in a Python 3 string (i.e. a Unicode string), redirect now converts the bytes read to string. For that it accepts encoding and errors arguments -- just like Python's decode (and using the same defaults). This addresses a real problem for the general use case: if you want to access the captured data as a string for further processing, you have to know in which encoding stderr is written to. This is also true for the approach in the other answer, where the stream is redirected to a file. If you are ok with just the bytes, you could modify the code to use a io.BytesIO.

import os, sys, threading
from contextlib import contextmanager
from io import StringIO

STDOUT, STDERR = 1, 2    

@contextmanager
def redirect(fd, encoding="utf-8", errors="strict"):
    # Save original handle so we can restore it later.
    saved_handle = os.dup(fd)

    # Redirect `fd` to the write end of the pipe.
    pipe_read, pipe_write = os.pipe()
    os.dup2(pipe_write, fd)
    os.close(pipe_write)

    # This thread reads from the read end of the pipe.
    def consumer_thread(f, data):
        while True:
            buf = os.read(f, 1024)
            if not buf: break
            data.write(buf.decode(encoding, errors))
        os.close(f)
        return

    # Spawn consumer thread, and give it a mutable `data` item to
    # store the redirected output.
    data = StringIO()
    thread = threading.Thread(target = consumer_thread, args=(pipe_read, data))
    thread.start()

    yield data

    # Cleanup: flush streams, restore `fd`
    { STDERR: sys.stderr, STDOUT: sys.stdout}[fd].flush()
    os.dup2(saved_handle, fd)
    os.close(saved_handle)
    thread.join()

The redirect context manager can then be used to temporarily redirect any stream to an internal buffer. Note that under Jupyter, sys.stderr is not connected to STDERR, it is an ipykernel.iostream.OutStream object, emulating a Python file. So we'll have to os.write to STDERR to see the redirection work.

if __name__ == "__main__":
    with redirect(STDERR) as data:
        os.write(STDERR, 100000 * b"Some xdata\n")
    print("captured output:", len(data.getvalue()))
    # The output from the next line will not be visible
    # in a Jupyter notebook, but in the kernel's console.
    os.write(STDERR, b"stderr is good again")
like image 36
fpbhb Avatar answered Sep 19 '22 17:09

fpbhb