I have a Python script where I'm binding several (e.g., 5) ZMQ receiver sockets like so:
receiver_1 = context.socket(zmq.PULL)
receiver_1.bind("tcp://*:5555")
...
receiver_5 = context.socket(zmq.PULL)
receiver_5.bind("tcp://*:5559")
receivers = [receiver_1, ..., receiver_5]
Then I start some Google Compute Engine instances, and they connect the corresponding sender sockets.
I'd like to pull from these sockets in parallel, and so I'm attempting to do that with a multiprocessing pool. The code looks something like this:
def recv_result(i):
result_str = receivers[i].recv()
return cPickle.loads(result_str)
pool = multiprocessing.Pool()
while True:
results = pool.map(recv_result, [i for i in range(len(receivers))])
# break when all results have been received
...
The error I'm getting when I run my script looks like this:
Traceback (most recent call last):
...
File ...
results = pool.map(recv_result, [i for i in range(len(receivers))])
File "/usr/lib/python2.7/multiprocessing/pool.py", line 227, in map
return self.map_async(func, iterable, chunksize).get()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 528, in get
raise self._value
zmq.error.ZMQError: Interrupted system call
I've also tried implementing the same functionality using multiprocessing.Process, but I get essentially the same error, though in a much messier fashion.
What I'm trying to do is to more efficiently receive all of the results from my GCE instances, as I've found this to be the bottleneck in my script (in my current working implementation, I have only one receiver socket, and it receives results from all of the GCE instances serially). If anyone could point me to the bug in my current code, or any suggestions for a better way to achieve my goal, it would be much appreciated!
Few hints:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With