I am using Python 2 subprocess
with threading
threads to take standard input, process it with binaries A
, B
, and C
and write modified data to standard output.
This script (let's call it: A_to_C.py
) is very slow and I'd like to learn how to fix it.
The general flow is as follows:
A_process = subprocess.Popen(['A', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
produce_A_thread = threading.Thread(target=produceA, args=(sys.stdin, A_process.stdin))
B_process = subprocess.Popen(['B', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
convert_A_to_B_thread = threading.Thread(target=produceB, args=(A_process.stdout, B_process.stdin))
C_process = subprocess.Popen(['C', '-'], stdin=subprocess.PIPE)
convert_B_to_C_thread = threading.Thread(target=produceC, args=(B_process.stdout, C_process.stdin))
produce_A_thread.start()
convert_A_to_B_thread.start()
convert_B_to_C_thread.start()
produce_A_thread.join()
convert_A_to_B_thread.join()
convert_B_to_C_thread.join()
A_process.wait()
B_process.wait()
C_process.wait()
The idea is that standard input goes into A_to_C.py
:
A
binary processes a chunk of standard input and creates A
-output with the function produceA
.B
binary processes a chunk of A
's standard output and creates B
-output via the function produceB
.C
binary processes a chunk of B
's standard output via the function produceC
and writes C
-output to standard output.I did profiling with cProfile and nearly all of the time in this script appears to be spent in acquiring thread locks.
For instance, in a test 417s job, 416s (>99% of the total runtime) is spent on acquiring thread locks:
$ python
Python 2.6.6 (r266:84292, Nov 21 2013, 10:50:32)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-4)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import pstats
>>> p = pstats.Stats('1.profile')
>>> p.sort_stats('cumulative').print_stats(10)
Thu Jun 12 22:19:07 2014 1.profile
1755 function calls (1752 primitive calls) in 417.203 CPU seconds
Ordered by: cumulative time
List reduced from 162 to 10 due to restriction <10>
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.020 0.020 417.203 417.203 A_to_C.py:90(<module>)
1 0.000 0.000 417.123 417.123 A_to_C.py:809(main)
6 0.000 0.000 416.424 69.404 /foo/python/2.7.3/lib/python2.7/threading.py:234(wait)
32 416.424 13.013 416.424 13.013 {method 'acquire' of 'thread.lock' objects}
3 0.000 0.000 416.422 138.807 /foo/python/2.7.3/lib/python2.7/threading.py:648(join)
3 0.000 0.000 0.498 0.166 A_to_C.py:473(which)
37 0.000 0.000 0.498 0.013 A_to_C.py:475(is_exe)
3 0.496 0.165 0.496 0.165 {posix.access}
6 0.000 0.000 0.194 0.032 /foo/python/2.7.3/lib/python2.7/subprocess.py:475(_eintr_retry_call)
3 0.000 0.000 0.191 0.064 /foo/python/2.7.3/lib/python2.7/subprocess.py:1286(wait)
What am I doing wrong with my threading.Thread
and/or subprocess.Popen
arrangement which is causing this issue?
I think you are just being mislead by the way cProfile works. For example, here's a simple script that uses two threads:
#!/usr/bin/python
import threading
import time
def f():
time.sleep(10)
def main():
t = threading.Thread(target=f)
t.start()
t.join()
If I test this using cProfile, here's what I get:
>>> import test
>>> import cProfile
>>> cProfile.run('test.main()')
60 function calls in 10.011 seconds
Ordered by: standard name
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.000 0.000 10.011 10.011 <string>:1(<module>)
1 0.000 0.000 10.011 10.011 test.py:10(main)
1 0.000 0.000 0.000 0.000 threading.py:1008(daemon)
2 0.000 0.000 0.000 0.000 threading.py:1152(currentThread)
2 0.000 0.000 0.000 0.000 threading.py:241(Condition)
2 0.000 0.000 0.000 0.000 threading.py:259(__init__)
2 0.000 0.000 0.000 0.000 threading.py:293(_release_save)
2 0.000 0.000 0.000 0.000 threading.py:296(_acquire_restore)
2 0.000 0.000 0.000 0.000 threading.py:299(_is_owned)
2 0.000 0.000 10.011 5.005 threading.py:308(wait)
1 0.000 0.000 0.000 0.000 threading.py:541(Event)
1 0.000 0.000 0.000 0.000 threading.py:560(__init__)
2 0.000 0.000 0.000 0.000 threading.py:569(isSet)
4 0.000 0.000 0.000 0.000 threading.py:58(__init__)
1 0.000 0.000 0.000 0.000 threading.py:602(wait)
1 0.000 0.000 0.000 0.000 threading.py:627(_newname)
5 0.000 0.000 0.000 0.000 threading.py:63(_note)
1 0.000 0.000 0.000 0.000 threading.py:656(__init__)
1 0.000 0.000 0.000 0.000 threading.py:709(_set_daemon)
1 0.000 0.000 0.000 0.000 threading.py:726(start)
1 0.000 0.000 10.010 10.010 threading.py:911(join)
10 10.010 1.001 10.010 1.001 {method 'acquire' of 'thread.lock' objects}
2 0.000 0.000 0.000 0.000 {method 'append' of 'list' objects}
1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}
4 0.000 0.000 0.000 0.000 {method 'release' of 'thread.lock' objects}
4 0.000 0.000 0.000 0.000 {thread.allocate_lock}
2 0.000 0.000 0.000 0.000 {thread.get_ident}
1 0.000 0.000 0.000 0.000 {thread.start_new_thread}
As you can see, it says that almost all of the time is spent acquiring locks. Of course, we know that's not really an accurate representation of what the script was doing. All the time was actually spent in a time.sleep
call inside f()
. The high tottime
of the acquire
call is just because join
was waiting for f
to finish, which means it had to sit and wait to acquire a lock. However, cProfile doesn't show any time being spent in f
at all. We can clearly see what is actually happening because the example code is so simple, but in a more complicated program, this output is very misleading.
You can get more reliable results by using another profiling library, like yappi:
>>> import test
>>> import yappi
>>> yappi.set_clock_type("wall")
>>> yappi.start()
>>> test.main()
>>> yappi.get_func_stats().print_all()
Clock type: wall
Ordered by: totaltime, desc
name #n tsub ttot tavg
<stdin>:1 <module> 2/1 0.000025 10.00801 5.004003
test.py:10 main 1 0.000060 10.00798 10.00798
..2.7/threading.py:308 _Condition.wait 2 0.000188 10.00746 5.003731
..thon2.7/threading.py:911 Thread.join 1 0.000039 10.00706 10.00706
..ython2.7/threading.py:752 Thread.run 1 0.000024 10.00682 10.00682
test.py:6 f 1 0.000013 10.00680 10.00680
..hon2.7/threading.py:726 Thread.start 1 0.000045 0.000608 0.000608
..thon2.7/threading.py:602 _Event.wait 1 0.000029 0.000484 0.000484
..2.7/threading.py:656 Thread.__init__ 1 0.000064 0.000250 0.000250
..on2.7/threading.py:866 Thread.__stop 1 0.000025 0.000121 0.000121
..lib/python2.7/threading.py:541 Event 1 0.000011 0.000101 0.000101
..python2.7/threading.py:241 Condition 2 0.000025 0.000094 0.000047
..hreading.py:399 _Condition.notifyAll 1 0.000020 0.000090 0.000090
..2.7/threading.py:560 _Event.__init__ 1 0.000018 0.000090 0.000090
..thon2.7/encodings/utf_8.py:15 decode 2 0.000031 0.000071 0.000035
..threading.py:259 _Condition.__init__ 2 0.000064 0.000069 0.000034
..7/threading.py:372 _Condition.notify 1 0.000034 0.000068 0.000068
..hreading.py:299 _Condition._is_owned 3 0.000017 0.000040 0.000013
../threading.py:709 Thread._set_daemon 1 0.000018 0.000035 0.000035
..ding.py:293 _Condition._release_save 2 0.000019 0.000033 0.000016
..thon2.7/threading.py:63 Thread._note 7 0.000020 0.000020 0.000003
..n2.7/threading.py:1152 currentThread 2 0.000015 0.000019 0.000009
..g.py:296 _Condition._acquire_restore 2 0.000011 0.000017 0.000008
../python2.7/threading.py:627 _newname 1 0.000014 0.000014 0.000014
..n2.7/threading.py:58 Thread.__init__ 4 0.000013 0.000013 0.000003
..threading.py:1008 _MainThread.daemon 1 0.000004 0.000004 0.000004
..hon2.7/threading.py:569 _Event.isSet 2 0.000003 0.000003 0.000002
With yappi
, it's much easier to see that the time is being spent in f
.
I suspect that you'll find that in reality, most of your script's time is spent doing whatever work is being done in produceA
, produceB
, and produceC
.
TL;DR If your program runs slower than expected, it is probably due to the details of what the intermediate functions do rather than due to IPC or threading. Test with mock functions and processes (as simple as possible) to isolate just the overhead of passing data to/from subprocesses. In a benchmark based closely on your code (below), the performance when passing data to/from subprocesses seems to be roughly equivalent to using shell pipes directly; python is not particularly slow at this task.
The general form of the original code is:
def produceB(from_stream, to_stream):
while True:
buf = from_stream.read()
processed_buf = do_expensive_calculation(buf)
to_stream.write(processed_buf)
Here the calculation between read and write takes about 2/3 of the total cpu time of all processes (main and sub) combined - this is cpu time, not wall clock time btw.
I think that this prevents the I/O from running at full speed. Reads and writes and the calculation each need to have their own thread, with queues to provide buffering between the read and calculation and between the calculation and write (since the amount of buffering that pipes provide is insufficient, I believe).
I show below that if there is no processing in between read and write (or equivalently: if the intermediate processing is done in separate thread), then the throughput from threads + subprocess is very high. It is also possible to have separate threads for reads and writes; this adds a bit of overhead but makes writes not block reads and vice versa. Three threads (read, write and processing) is even better, then neither step blocks the others (within the limits of the queue sizes, of course).
All benchmarking below is on python 2.7.6 on Ubuntu 14.04LTS 64bit (Intel i7, Ivy Bridge, quad core). The test is to transfer approx 1GB of data in 4KB blocks between two dd
processes, and pass the data through python as an intermediary. The dd processes use medium sized (4KB) blocks; typical text I/O would be smaller (unless it is cleverly buffered by the interpreter, etc), typical binary I/O would of course be much larger. I have one example based on exactly how you did this, and one example based on an alternate approach I had tried some time ago (which turns out to be slower). By the way, thanks for posting this question, it is useful.
First, let's convert the original code in the question into a slightly simpler self-contained example. This is just two processes communicating with a thread that pumps data from one to the other, doing blocking reads and writes.
import subprocess, threading
A_process = subprocess.Popen(["dd", "if=/dev/zero", "bs=4k", "count=244140"], stdout=subprocess.PIPE)
B_process = subprocess.Popen(["dd", "of=/dev/null", "bs=4k"], stdin=subprocess.PIPE)
def convert_A_to_B(src, dst):
read_size = 8*1024
while True:
try:
buf = src.read(read_size)
if len(buf) == 0: # This is a bit hacky, but seems to reliably happen when the src is closed
break
dst.write(buf)
except ValueError as e: # Reading or writing on a closed fd causes ValueError, not IOError
print str(e)
break
convert_A_to_B_thread = threading.Thread(target=convert_A_to_B, args=(A_process.stdout, B_process.stdin))
convert_A_to_B_thread.start()
# Here, watch out for the exact sequence to clean things up
convert_A_to_B_thread.join()
A_process.wait()
B_process.stdin.close()
B_process.wait()
Results:
244140+0 records in
244140+0 records out
999997440 bytes (1.0 GB) copied, 0.638977 s, 1.6 GB/s
244140+0 records in
244140+0 records out
999997440 bytes (1.0 GB) copied, 0.635499 s, 1.6 GB/s
real 0m0.678s
user 0m0.657s
sys 0m1.273s
Not bad! It turns out that the ideal read size in this case is roughly 8k-16KB, much smaller and much larger sizes are somewhat slower. This is probably related to the 4KB block size we asked dd to use.
When I was looking at this type of problem before, I headed in the direction of using select()
, nonblocking I/O, and a single thread. An example of that is in my question here: How to read and write from subprocesses asynchronously?. That was for reading from two processes in parallel, which I have extended below to reading from one process and writing to another. The nonblocking writes are limited to PIPE_BUF or less in size, which is 4KB on my system; for simplicity, the reads are also 4KB although they could be any size. This has a few weird corner cases (and inexplicable hangs, depending on the details) but in the form below it works reliably.
import subprocess, select, fcntl, os, sys
p1 = subprocess.Popen(["dd", "if=/dev/zero", "bs=4k", "count=244140"], stdout=subprocess.PIPE)
p2 = subprocess.Popen(["dd", "of=/dev/null", "bs=4k"], stdin=subprocess.PIPE)
def make_nonblocking(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
make_nonblocking(p1.stdout)
make_nonblocking(p2.stdin)
print "PIPE_BUF = %d" % (select.PIPE_BUF)
read_size = select.PIPE_BUF
max_buf_len = 1 # For reasons which I have not debugged completely, this hangs sometimes when set > 1
bufs = []
while True:
inputready, outputready, exceptready = select.select([ p1.stdout.fileno() ],[ p2.stdin.fileno() ],[])
for fd in inputready:
if fd == p1.stdout.fileno():
if len(bufs) < max_buf_len:
data = p1.stdout.read(read_size)
bufs.append(data)
for fd in outputready:
if fd == p2.stdin.fileno() and len(bufs) > 0:
data = bufs.pop(0)
p2.stdin.write(data)
p1.poll()
# If the first process is done and there is nothing more to write out
if p1.returncode != None and len(bufs) == 0:
# Again cleanup is tricky. We expect the second process to finish soon after its input is closed
p2.stdin.close()
p2.wait()
p1.wait()
break
Results:
PIPE_BUF = 4096
244140+0 records in
244140+0 records out
999997440 bytes (1.0 GB) copied, 3.13722 s, 319 MB/s
244133+0 records in
244133+0 records out
999968768 bytes (1.0 GB) copied, 3.13599 s, 319 MB/s
real 0m3.167s
user 0m2.719s
sys 0m2.373s
This is however significantly slower than the version above (even if the read/write size is made 4KB in both for an apples-to-apples comparison). I'm not sure why.
P.S. Late addition: It appears that it is ok to ignore or exceed PIPE_BUF. This causes an IOError exception to be thrown much of the time from p2.stdin.write()
(errno=11, temporarily unavailable), presumably when there is enough room in the pipe to write something, but less than the full size we are requesting. The same code above with read_size = 64*1024
, and with that exception caught and ignored, runs at 1.4GB/s.
Just as a baseline, how fast is it to run this using the shell version of pipes (in subprocess)? Let's have a look:
import subprocess
subprocess.call("dd if=/dev/zero bs=4k count=244140 | dd of=/dev/null bs=4k", shell=True)
Results:
244140+0 records in
244140+0 records out
244140+0 records in
244140+0 records out
999997440 bytes (1.0 GB) copied, 0.425261 s, 2.4 GB/s
999997440 bytes (1.0 GB) copied, 0.423687 s, 2.4 GB/s
real 0m0.466s
user 0m0.300s
sys 0m0.590s
This is notably faster than the threaded python example. However, this is just one copy, while the threaded python version is doing two (into and out of python). Modifying the command to "dd if=/dev/zero bs=4k count=244140 | dd bs=4k | dd of=/dev/null bs=4k"
bring the performance to 1.6GB, in line with the python example.
Some additional thoughts on how to run a comparison in a complete system. Again for simplicity this is just two processes, and both scripts have the exact same convert_A_to_B()
function.
Script 1: Pass data in python, as above
A_process = subprocess.Popen(["A", ...
B_process = subprocess.Popen(["B", ...
convert_A_to_B_thread = threading.Thread(target=convert_A_to_B, ...
Script 2: Comparison script, pass data in shell
convert_A_to_B(sys.stdin, sys.stdout)
run this in the shell with: A | python script_2.py | B
This allows an apples-to-apples comparison in a complete system, without using mock functions/processes.
For this test, the code from the first (threaded) example above is used, and both dd
and the python script are set to use the same block size reads/writes.
| Block size | Throughput |
|------------|------------|
| 1KB | 249MB/s |
| 2KB | 416MB/s |
| 4KB | 552MB/s |
| 8KB | 1.4GB/s |
| 16KB | 1.8GB/s |
| 32KB | 2.9GB/s |
| 64KB | 3.0GB/s |
| 128KB | 1.0GB/s |
| 256KB | 600MB/s |
In theory there should be better performance with larger buffers (perhaps up to cache effects) but in practice Linux pipes slow down with very large buffers, even when using pure shell pipes.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With