I'm trying to capture the things that are printed to STDERR when I run commands in jupyter notebook. In particular, I'm using TensorFlow, which does fprintf
from the C parts, which is normally printed on console, but I want to save into Python variable.
I've been using FDRedirector from IPython codebase which sets up os.pipe
to capture output into Python string.
However, the problem with this code is that it hangs the kernel for large enough output. I'm suspecting it will hang for output over 65k since that's pipe buffer size on Linux and gdb
shows the hang is happening in write
. Does anyone have a solution that would work with larger outputs?
As an example of what I'm doing right now, using FDRedirector
STDERR = 2
redirect = FDRedirector(STDERR)
import tensorflow as tf
sess = tf.Session("")
node = tf.Print(tf.constant(1), [tf.constant(1)], "longstringlongstring")
def print_to_stderr():
sess.run(node) # this prints to stderr
redirect.start();
print_to_stderr()
captured_stderr = redirect.stop()
At the end, "captured_stderr" contains all the things printed to stderr, including longstringlongstring
. If you make longstring
part much longer (>100k), this will freeze.
This code tries to mimick Jupyter %%capture
magic by returning an IPython.utils.capture.CapturedIO
instance.
It makes use of tempfile.TemporaryFile
to store the bytes written to stdout
and stderr
. The __init__
method makes sure sys.stdout
is same as sys.__stdout__
within the with
block. So does with sys.stderr
. The __exit__
method first syncs data to the temporary files with os.fdatasync
then reads bytes from temporary files and writes back to StringIO
instance.
The returned IPython.utils.capture.CapturedIO
instance sports stdout
, stderr
property, a show
method and itself is a callable.
from io import StringIO
import os
import sys
from tempfile import TemporaryFile
from IPython.utils.capture import CapturedIO
class Capture(object):
def __init__(self, stdout=True, stderr=True):
self.old_stdout = self.old_stderr = self.stdout = self.stderr = None
if stdout:
stdout = StringIO()
if hasattr(sys.__stdout__, 'fileno'):
self.stdout = sys.__stdout__.fileno()
# make sys.stdout and sys.__stdout__ same
if sys.stdout != sys.__stdout__:
self.old_stdout = sys.stdout
sys.stdout = sys.__stdout__
elif hasattr(sys.stdout, 'fileno'):
self.stdout = sys.stdout.fileno()
self.stdout_clone = os.dup(self.stdout)
self.stdout_temp_file = TemporaryFile()
if stderr:
stderr = StringIO()
if hasattr(sys.__stderr__, 'fileno'):
self.stderr = sys.__stderr__.fileno()
if sys.stderr != sys.__stderr__:
self.old_stderr = sys.stderr
sys.stderr = sys.__stderr__
elif hasattr(sys.stderr, 'fileno'):
self.stderr = sys.stderr.fileno()
self.stderr_clone = os.dup(self.stderr)
self.stderr_temp_file = TemporaryFile()
self.output = CapturedIO(stdout, stderr)
def __enter__(self):
if self.stdout is not None:
os.dup2(self.stdout_temp_file.fileno(), self.stdout)
if self.stderr is not None:
os.dup2(self.stderr_temp_file.fileno(), self.stderr)
return self.output
def __exit__(self, exc_type, val, tb):
if self.stdout is not None:
# sync data only
os.fdatasync(self.stdout)
# if sys.stdout was replaced, restore original
if self.old_stdout is not None:
sys.stdout = self.old_stdout
# bytes written
nbytes = os.lseek(self.stdout, 0, os.SEEK_CUR)
# rewind
os.lseek(self.stdout, 0, os.SEEK_SET)
# read from temp file and write to StringIO
self.output._stdout.write(os.read(self.stdout, nbytes).decode('utf-8'))
# restore file descriptors
os.dup2(self.stdout_clone, self.stdout)
os.close(self.stdout_clone)
if self.stderr is not None:
os.fdatasync(self.stderr)
if self.old_stderr is not None:
sys.stderr = self.old_stderr
nbytes = os.lseek(self.stderr, 0, os.SEEK_CUR)
os.lseek(self.stderr, 0, os.SEEK_SET)
self.output._stderr.write(os.read(self.stderr, nbytes).decode('utf-8'))
os.dup2(self.stderr_clone, self.stderr)
os.close(self.stderr_clone)
if exc_type is not None:
return False
Test;
import tensorflow as tf
sess = tf.Session("")
node = tf.Print(tf.constant(1), [tf.constant(1)], "a" * 1024 ** 2)
with Capture() as output:
print(sess.run(node))
print('stdout: {}\nstderr: {}'.format(output.stdout, len(output.stderr)))
Output;
stdout: 1
stderr: 1048625
You can try piping your output to a temp file - so no buffer limits:
STDERR=2
STDOUT=1
import os
import sys
import tempfile
class captured:
def __init__(self, fd=STDERR):
self.fd = fd
self.prevfd = None
def __enter__(self):
t = tempfile.NamedTemporaryFile()
print 'Piping your output to ' + t.name
self.prevfd = os.dup(self.fd)
os.dup2(t.fileno(), self.fd)
return t
def __exit__(self, exc_type, exc_value, traceback):
os.dup2(self.prevfd, self.fd)
with captured(fd=STDOUT) as tmp:
os.system('cat 1mbfile.txt');
print "Captured:", open(tmp.name).read()
Please let me know if this works for your purpose. Unfortunately I don't have TF installed.
Jupyter itself survived outputting 1Mb to a cell :)
The problem with FDRedirector
is, that it reads only once from the pipe. As both, the producer and the consumer end of the pipe, live in the same process, this will block the writing end of the pipe once the buffer is full. The solution is to read continuously from the read end to not block the other side. One way to do so is spawning a consumer thread.
Here is a a piece of code that does just that: redirect stderr to a pipe and read continuously using a thread.
Update: The code is now for Python 3 as suggested by OP's comment. Assuming that you want to have the captured output in a Python 3 string (i.e. a Unicode string), redirect
now converts the bytes read to string. For that it accepts encoding
and errors
arguments -- just like Python's decode
(and using the same defaults). This addresses a real problem for the general use case: if you want to access the captured data as a string for further processing, you have to know in which encoding stderr is written to. This is also true for the approach in the other answer, where the stream is redirected to a file. If you are ok with just the bytes, you could modify the code to use a io.BytesIO
.
import os, sys, threading
from contextlib import contextmanager
from io import StringIO
STDOUT, STDERR = 1, 2
@contextmanager
def redirect(fd, encoding="utf-8", errors="strict"):
# Save original handle so we can restore it later.
saved_handle = os.dup(fd)
# Redirect `fd` to the write end of the pipe.
pipe_read, pipe_write = os.pipe()
os.dup2(pipe_write, fd)
os.close(pipe_write)
# This thread reads from the read end of the pipe.
def consumer_thread(f, data):
while True:
buf = os.read(f, 1024)
if not buf: break
data.write(buf.decode(encoding, errors))
os.close(f)
return
# Spawn consumer thread, and give it a mutable `data` item to
# store the redirected output.
data = StringIO()
thread = threading.Thread(target = consumer_thread, args=(pipe_read, data))
thread.start()
yield data
# Cleanup: flush streams, restore `fd`
{ STDERR: sys.stderr, STDOUT: sys.stdout}[fd].flush()
os.dup2(saved_handle, fd)
os.close(saved_handle)
thread.join()
The redirect
context manager can then be used to temporarily redirect any stream to an internal buffer. Note that under Jupyter, sys.stderr
is not connected to STDERR
, it is an ipykernel.iostream.OutStream
object, emulating a Python file
. So we'll have to os.write
to STDERR
to see the redirection work.
if __name__ == "__main__":
with redirect(STDERR) as data:
os.write(STDERR, 100000 * b"Some xdata\n")
print("captured output:", len(data.getvalue()))
# The output from the next line will not be visible
# in a Jupyter notebook, but in the kernel's console.
os.write(STDERR, b"stderr is good again")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With