Is it possible to strace a python function for opened files, and differentiate if they were opened by python or a subprocess?
read_python, read_external = [], []
@strace_read(read_python, read_external)
function test():
file = open("foo.txt", "r")
subprocess.call(["cat", "bar.txt"])
for file in read_python:
print("python: ", file)
for file in read_external:
print("external: ", file)
So the output is as:
>>> python: foo.txt
>>> external: bar.txt
I'm most interested in using a decorator. Differentiating isn't a priority.
Conceptually, my best guess is to replace instances of load_function(open)
with wrappers ... actually, I have no idea, there are too many ways to access open
.
I'd solve it in a much simpler way but with similar result. Instead of figuring out how to enable strace on a single function:
-
def strace_mark(f):
def wrapper(*args, **kwargs):
try:
open('function-%s-start' % f.__name__, 'r')
except:
pass
ret = f(*args, **kwargs)
try:
open('function-%s-end' % f.__name__, 'r')
except:
pass
return ret
strace -e file
.open(function-something-start)
and open(function-something-end)
.If you do strace -f
, you get the python/external separation for free. Just look at what pid calls the function.
This is the solution I used:
#!/usr/bin/env python3
import multiprocessing
import selectors
import os
import array
import fcntl
import termios
import subprocess
import decorator
import locale
import io
import codecs
import re
import collections
def strace(function):
StraceReturn = collections.namedtuple("StraceReturn", ["return_data", "pid", "strace_data"])
def strace_filter(stracefile, pid, exclude_system=False):
system = ( "/bin"
, "/boot"
, "/dev"
, "/etc"
, "/lib"
, "/proc"
, "/root"
, "/run"
, "/sbin"
, "/srv"
, "/sys"
, "/tmp"
, "/usr"
, "/var"
)
encoding = locale.getpreferredencoding(False)
for line in stracefile:
match = re.search(r'^(?:\[pid\s+(\d+)\]\s+)?open\(\"((?:\\x[0-9a-f]{2})+)\",', line, re.IGNORECASE)
if match:
p, f = match.groups(pid)
f = codecs.escape_decode(f.encode("ascii"))[0].decode(encoding)
if exclude_system and f.startswith(system):
continue
yield (p, f)
def strace_reader(conn_parent, conn_child, barrier, pid):
conn_parent.close()
encoding = locale.getpreferredencoding(False)
strace_args = ["strace", "-e", "open", "-f", "-s", "512", "-xx", "-p", str(pid)]
process_data = io.StringIO()
process = subprocess.Popen\
( strace_args
, stdout = subprocess.DEVNULL
, stderr = subprocess.PIPE
, universal_newlines = True
)
selector = selectors.DefaultSelector()
selector.register(process.stderr, selectors.EVENT_READ)
selector.select()
barrier.wait()
selector.register(conn_child, selectors.EVENT_READ)
while len(selector.get_map()):
events = selector.select()
for key, mask in events:
if key.fd == conn_child.fileno():
conn_child.recv()
selector.unregister(key.fd)
process.terminate()
try:
process.wait(5)
except TimeoutError:
process.kill()
process.wait()
else:
ioctl_buffer = array.array("i", [0])
try:
fcntl.ioctl(key.fd, termios.FIONREAD, ioctl_buffer)
except OSError:
read_bytes = 1024
else:
read_bytes = max(1024, ioctl_buffer[0])
data = os.read(key.fd, read_bytes)
if data:
# store all data, simpler but not as memory-efficient
# as:
# result, leftover_line = strace_filter\
# ( leftover_line + data.decode(encoding)
# , pid
# )
# process_data.append(result)
# with, after this loop, a final:
# result = strace_filter(leftover_line + "\n", pid)
# process_data.append(result)
process_data.write(data.decode(encoding))
else:
selector.unregister(key.fd)
selector.close()
process_data.seek(0, io.SEEK_SET)
for pidfile in strace_filter(process_data, pid):
conn_child.send(pidfile)
conn_child.close()
def strace_wrapper(function, *args, **kw):
strace_data = list()
barrier = multiprocessing.Barrier(2)
conn_parent, conn_child = multiprocessing.Pipe(duplex = True)
process = multiprocessing.Process\
( target=strace_reader
, args=(conn_parent, conn_child, barrier, os.getpid())
)
process.start()
conn_child.close()
barrier.wait()
function_return = function()
conn_parent.send(None)
while True:
try:
strace_data.append(conn_parent.recv())
except EOFError:
break
process.join(5)
if process.is_alive():
process.terminate()
process.join(5)
if process.is_alive():
os.kill(process.pid, signal.SIGKILL)
process.join()
conn_parent.close()
return StraceReturn(function_return, os.getpid(), strace_data)
return decorator.decorator(strace_wrapper, function)
@strace
def test():
print("Entering test()")
process = subprocess.Popen("cat +μυρτιὲς.txt", shell=True)
f = open("test\"test", "r")
f.close()
process.wait()
print("Exiting test()")
return 5
print(test())
Note that any information strace generates after the termination event will be collected. To avoid that, use a while not signaled
loop, and terminate the subprocess after the loop (the FIONREAD ioctl is a holdover from this case; I didn't see any reason to remove it).
In hindsight, the decorator could have been greatly simplified had I used a temporary file, rather than multiprocessing/pipe.
A child process is forked to then fork strace
- in other words, strace
is tracing its grandparent. Some linux distributions only allow strace
to trace its children. I'm not sure how to work around this restriction - having the main program continue executing in the child fork (while the parent execs strace
) is probably a bad idea - the program will trade PIDs like a hot potato if the decorated functions are used too often.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With