Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python ZMQ and multiprocessing causes zmq.error.ZMQError: Interrupted system call

I have a Python script where I'm binding several (e.g., 5) ZMQ receiver sockets like so:

receiver_1 = context.socket(zmq.PULL)
receiver_1.bind("tcp://*:5555")
...
receiver_5 = context.socket(zmq.PULL)
receiver_5.bind("tcp://*:5559")

receivers = [receiver_1, ..., receiver_5]

Then I start some Google Compute Engine instances, and they connect the corresponding sender sockets.

I'd like to pull from these sockets in parallel, and so I'm attempting to do that with a multiprocessing pool. The code looks something like this:

def recv_result(i):
    result_str = receivers[i].recv()
    return cPickle.loads(result_str)

pool = multiprocessing.Pool()
while True:
    results = pool.map(recv_result, [i for i in range(len(receivers))])
    # break when all results have been received
    ...

The error I'm getting when I run my script looks like this:

Traceback (most recent call last):
  ...
  File ...
    results = pool.map(recv_result, [i for i in range(len(receivers))])
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 227, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 528, in get
    raise self._value
zmq.error.ZMQError: Interrupted system call

I've also tried implementing the same functionality using multiprocessing.Process, but I get essentially the same error, though in a much messier fashion.

What I'm trying to do is to more efficiently receive all of the results from my GCE instances, as I've found this to be the bottleneck in my script (in my current working implementation, I have only one receiver socket, and it receives results from all of the GCE instances serially). If anyone could point me to the bug in my current code, or any suggestions for a better way to achieve my goal, it would be much appreciated!

like image 318
Marvin Avatar asked Oct 31 '22 21:10

Marvin


1 Answers

Few hints:

  • good you use ZeroMQ - it can do a lot for you without writing too much code
  • do not overoptimize. You will not gain anything by multiprocessing/threading for ZeroMQ communication, it is already very fast and able to exchange incredible amount of messages
  • if using threading/multiprocessing, never share zmq context, it must be private for the thread, otherwise it breaks. This is probably the cause of your exception.
  • if current code uses blocking zeromq sends and receive, change it to nonblocking. Check how to use polling.
like image 124
Jan Vlcinsky Avatar answered Nov 18 '22 01:11

Jan Vlcinsky