I am doing IPC with a python subprocess. For now, let's assume I have to use subprocess.Popen
to spawn the other process, so I can't use multiprocessing.Pipe
for communication. The first thing, that came to my mind is to use their STDIO streams with pickle.load
+pickle.dump
(don't worry about security right now).
However, I noticed, that the transfer rates are just terrible: order of 750KB/s on my machine! This is slower than communicating via multiprocessing.Pipe
by a factor of 95, which uses pickle
as well, as far as I understand it. There is no benefit in using cPickle
either.
(Update: Note, I realized, this is only the case on python2! On python3 it works fine.)
Why is this so terribly slow? I suspect the reason is somewhere in the way that IO is performed in .dump
/.load
via python file objects not the C file descriptors. Maybe it has something to do with the GIL?
Is there any way cross-platform way to get the same speed as multiprocessing.Pipe
?
I have already found out, that on linux it is possible to use _multiprocessing.Connection
(or multiprocessing.connection.Connection
on python3) to wrap the STDIO file descriptors of the subprocess and get what I want. However, this is not possible on win32, and I don't even know about Mac.
Benchmark:
from __future__ import print_function
from timeit import default_timer
from subprocess import Popen, PIPE
import pickle
import sys
import os
import numpy
try:
from _multiprocessing import Connection as _Connection
except ImportError:
from multiprocessing.connection import Connection as _Connection
def main(args):
if args:
worker(connect(args[0], sys.stdin, sys.stdout))
else:
benchmark()
def worker(conn):
while True:
try:
amount = conn.recv()
except EOFError:
break
else:
conn.send(numpy.random.random(amount))
conn.close()
def benchmark():
for amount in numpy.arange(11)*10000:
pickle = parent('pickle', amount, 1)
pipe = parent('pipe', amount, 1)
print(pickle[0]/1000, pickle[1], pipe[1])
def parent(channel, amount, repeat):
start = default_timer()
proc = Popen([sys.executable, '-u', __file__, channel],
stdin=PIPE, stdout=PIPE)
conn = connect(channel, proc.stdout, proc.stdin)
for i in range(repeat):
conn.send(amount)
data = conn.recv()
conn.close()
end = default_timer()
return data.nbytes, end - start
class PickleConnection(object):
def __init__(self, recv, send):
self._recv = recv
self._send = send
def recv(self):
return pickle.load(self._recv)
def send(self, data):
pickle.dump(data, self._send)
def close(self):
self._recv.close()
self._send.close()
class PipeConnection(object):
def __init__(self, recv_fd, send_fd):
self._recv = _Connection(recv_fd)
self._send = _Connection(send_fd)
def recv(self):
return self._recv.recv()
def send(self, data):
self._send.send(data)
def close(self):
self._recv.close()
self._send.close()
def connect(channel, recv, send):
recv_fd = os.dup(recv.fileno())
send_fd = os.dup(send.fileno())
recv.close()
send.close()
if channel == 'pipe':
return PipeConnection(recv_fd, send_fd)
elif channel == 'pickle':
return PickleConnection(os.fdopen(recv_fd, 'rb', 0),
os.fdopen(send_fd, 'wb', 0))
else:
raise ValueError("Invalid channel: %s" % channel)
if __name__ == '__main__':
main(sys.argv[1:])
Results:
Many thanks for reading,
Thomas
Update:
Okay, so I profiled this as suggested by @martineau. The following results are obtained in independent invocations for a single run with the fixed value of amount=500000
.
In the parent process, the top calls sorted by tottime are:
11916 function calls (11825 primitive calls) in 5.382 seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
35 4.471 0.128 4.471 0.128 {method 'readline' of 'file' objects}
52 0.693 0.013 0.693 0.013 {method 'read' of 'file' objects}
4 0.062 0.016 0.063 0.016 {method 'decode' of 'str' objects}
In the subprocess:
11978 function calls (11855 primitive calls) in 5.298 seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
52 4.476 0.086 4.476 0.086 {method 'write' of 'file' objects}
73 0.552 0.008 0.552 0.008 {repr}
3 0.112 0.037 0.112 0.037 {method 'read' of 'file' objects}
This got me worried, that the usage readline
may be the reason for the bad performance.
The following connection uses only pickle.dumps
/pickle.loads
together with write
/read
.
class DumpsConnection(object):
def __init__(self, recv, send):
self._recv = recv
self._send = send
def recv(self):
raw_len = self._recvl(4)
content_len = struct.unpack('>I', raw_len)[0]
content = self._recvl(content_len)
return pickle.loads(content)
def send(self, data):
content = pickle.dumps(data)
self._send.write(struct.pack('>I', len(content)))
self._send.write(content)
def _recvl(self, size):
data = b''
while len(data) < size:
packet = self._recv.read(size - len(data))
if not packet:
raise EOFError
data += packet
return data
def close(self):
self._recv.close()
self._send.close()
Indeed, its speed is only 14 times worse than that of multiprocessing.Pipe
. (Which is still terrible)
Profiling now, in parent:
11935 function calls (11844 primitive calls) in 1.749 seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
2 1.385 0.692 1.385 0.692 {method 'read' of 'file' objects}
4 0.125 0.031 0.125 0.031 {method 'decode' of 'str' objects}
4 0.056 0.014 0.228 0.057 pickle.py:961(load_string)
in child:
11996 function calls (11873 primitive calls) in 1.627 seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
73 1.099 0.015 1.099 0.015 {repr}
3 0.231 0.077 0.231 0.077 {method 'read' of 'file' objects}
2 0.055 0.028 0.055 0.028 {method 'write' of 'file' objects}
So, I still have no real clue, what to use instead.
There are some problem with pickle/cPickle serializing the numpy array:
In [14]: timeit cPickle.dumps(numpy.random.random(1000))
1000 loops, best of 3: 727 us per loop
In [15]: timeit numpy.random.random(1000).dumps()
10000 loops, best of 3: 31.6 us per loop
The problem only occurs with serializing, deserializing are fine:
In [16]: timeit cPickle.loads(numpy.random.random(1000).dumps())
10000 loops, best of 3: 40 us per loop
You could use marshal module, witch is even faster (but not safe):
In [19]: timeit marshal.loads(marshal.dumps(numpy.random.random(1000)))
10000 loops, best of 3: 29.8 us per loop
Well I recommended msgpack, but it do not have support for numpy, and one lib that have it is terrible slow, anyway python-msgpack do not have support for buffers nor has zerocopy feature so its not possible to do efficient support for numpy.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With