I need some help in figuring out how to automate a benchmark effort in python.
I'm testing the effects of threading on a BLAS library calls through numpy in python. In a linux environment, threading in OpenBLAS is controlled through the environment variable OMP_NUM_THREADS
. I want to do a test where I increment OMP_NUM_THREADS
from 1 to a max value, time a routine at each thread count, and then finally manipulate the aggregate timing for all thread counts.
The issue is the following. Environment variables can be set in python, but they only affect subprocesses or subshells. So I can correctly run my benchmark with the following driver code:
#!/usr/bin/env python # driver script for thread test
import os
thread_set =[1,2,4,8,16]
for thread in thread_set:
os.environ['OMP_NUM_THREADS']='{:d}'.format(thread)
os.system("echo $OMP_NUM_THREADS")
os.system("numpy_test")
and numpy_test script:
#!/usr/bin/env python
#timing test for numpy dot product (using OpenBLAS)
#based on http://stackoverflow.com/questions/11443302/compiling-numpy-with-openblas-integration
import sys
import timeit
setup = "import numpy; x = numpy.random.random((1000,1000))"
count = 5
t = timeit.Timer("numpy.dot(x, x.T)", setup=setup)
dot_time = t.timeit(count)/count
print("dot: {:7.3g} sec".format(dot_time))
but analyzing this is a very manual process.
In particular, I can't return the value dot_time
from numpy_test
up to my outer wrapper routine, so I can't analyze the results of my test in any automated fashion. As an example, I'd like to plot dot_time
vs number of threads, or evaluate whether dot_time
/number of threads is constant.
If I try to do a similar test entirely within a python instance by defining a python test function (avoiding the os.system()
approach above), and then running the test function within the thread in thread_set
loop, then all instances of the test function inherit the same value for OMP_NUM_THREADS
(that of the parent python shell). So this test fails:
#!/usr/bin/env python
#attempt at testing threads that doesn't work
#(always uses inherited value of OMP_NUM_THREADS)
import os
import sys
import timeit
def test_numpy():
setup = "import numpy; x = numpy.random.random((1000,1000))"
count = 5
t = timeit.Timer("numpy.dot(x, x.T)", setup=setup)
dot_time = t.timeit(count)/count
print("dot: {:7.3g} sec".format(dot_time))
return dot_time
thread_set =[1,2,4,8,16]
for thread in thread_set:
os.environ['OMP_NUM_THREADS']='{:d}'.format(thread)
os.system("echo $OMP_NUM_THREADS")
time_to_run = test_numpy()
print(time_to_run)
This fails in that every instance of thread
takes the same time, as test_numpy()
always inherits the value of OMP_NUM_THREADS
in the parent environment rather than the value set through os.environ()
. If something like this worked however, it would be trivial to do the analysis I need to do.
In the real test, I'll be running over a few 1000 permutations, so automation is key. Given that, I'd appreciate an answer to any of these questions:
How would you return a value (dot_time
) from a subprocess like this? Is there a more elegant solution than reading/writing a file?
Is there a better way to structure this sort of (environment variable dependent) test?
Thank you in advance.
You can do something like this:
import subprocess
os.environ['OMP_NUM_THREADS'] = '{:d}'.format(thread)
proc = subprocess.Popen(["numpy_test"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = proc.communicate()
Then you'll have the output of the the numpy_test
script in stdout. In general I believe subprocess.call
and subprocess.Popen
are prefered over os.system
.
If you want to get the output from the subprocess, use subprocess.check_output, e.g. replace
os.system("numpy_test")
with
dot_output = subprocess.check_output(["numpy_test"])
dot_time = ... # extract time from dot_output
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With