I am attempting to use multiprocessing's pool to run a group of processes, each of which will run a gevent pool of greenlets. The reason for this is that there is a lot of network activity, but also a lot of CPU activity, so to maximise my bandwidth and all of my CPU cores, I need multiple processes AND gevent's async monkey patching. I am using multiprocessing's manager to create a queue which the processes will access to get data to process.
Here is a simplified fragment of the code:
import multiprocessing
from gevent import monkey
monkey.patch_all(thread=False)
manager = multiprocessing.Manager()
q = manager.Queue()
Here is the exception it produces:
Traceback (most recent call last):
File "multimonkeytest.py", line 7, in <module>
q = manager.Queue()
File "/usr/local/Cellar/python/2.7.2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/managers.py", line 667, in temp
token, exp = self._create(typeid, *args, **kwds)
File "/usr/local/Cellar/python/2.7.2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/managers.py", line 565, in _create
conn = self._Client(self._address, authkey=self._authkey)
File "/usr/local/Cellar/python/2.7.2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/connection.py", line 175, in Client
answer_challenge(c, authkey)
File "/usr/local/Cellar/python/2.7.2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/connection.py", line 409, in answer_challenge
message = connection.recv_bytes(256) # reject large message
IOError: [Errno 35] Resource temporarily unavailable
I believe this must be due to some difference between the behaviour of the normal socket module and gevent's socket module.
If I monkeypatch within the subprocess, The queue is created successfully, but when the subprocess tries to get() from the queue, a very similar exception occurs. The socket does need to be monkeypatched due to doing large numbers of network requests in the subprocesses.
My version of gevent, which I believe is the latest:
>>> gevent.version_info
(1, 0, 0, 'alpha', 3)
Any ideas?
use monkey.patch_all(thread=False, socket=False)
I have run into the same issue in a similar situation and tracked this down to line 115 in gevent/monkey.py
under the patch_socket()
function: _socket.socket = socket.socket
. Commenting this line out prevents the breakage.
This is where gevent replaces the stdlib socket
library with its own. multiprocessing.connection
uses the socket
library quite extensively, and is apparently not tolerant to this change.
Specifically, you will see this in any scenario where a module you import performs a gevent.monkey.patch_all()
call without setting socket=False
. In my case it was grequests
that did this, and I had to override the patching of the socket module to fix this error.
Application of multiprocessing in the context of gevent is unfortunately known to raise problems. Your rationale, however, is reasonable ("a lot of network activity, but also a lot of CPU activity"). If you like, have a look at http://gehrcke.de/gipc. This is designed primarily for your use case. With gipc, you can easily spawn a few fully gevent-aware child processes and let them cooperatively talk to each other and/or with the parent through pipes.
If you have specific questions, you're welcome to get back to me.
If you will use original Queue, then you code will work normally even with monkey patched socket.
import multiprocessing
from gevent import monkey
monkey.patch_all(thread=False)
q= multiprocessing.Queue()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With