I'm now implementing a data subscriber with Python, which subscribes to a data publisher (actually a ZeroMQ publisher socket) and will get notified once any new messages are fed. In my subscriber, the messages are dumped to a data processor after received. The subscriber will also get notified by the processor when done. Since the data processor is written in C++, I have to extend the Python code with a simple C++ module.
Below is a simplified runnable code sample of my data subscriber. Code main.py
, in which the module proc represents the processor, subscribe to a ZeroMQ socket on localhost:10000
, setup the callback, and send the received message to the processor by calling proc.onMsg
.
#!/bin/python
# main.py
import gevent
import logging
import zmq.green as zmq
import pub
import proc
logging.basicConfig( format='[%(levelname)s] %(message)s', level=logging.DEBUG )
SUB_ADDR = 'tcp://localhost:10000'
def setupMqAndReceive():
'''Setup the message queue and receive messages.
'''
ctx = zmq.Context()
sock = ctx.socket( zmq.SUB )
# add topics
sock.setsockopt_string( zmq.SUBSCRIBE, 'Hello' )
sock.connect( SUB_ADDR )
while True:
msg = sock.recv().decode( 'utf-8' )
proc.onMsg( msg )
def callback( a, b ):
print( '[callback]', a, b )
def main():
'''Entrance of the module.
'''
pub.start()
proc.setCallback( callback )
'''A simple on-liner
gevent.spawn( setupMqAndReceive ).join()
works. However, the received messages will not be
processed by the processor.
'''
gevent.spawn( setupMqAndReceive )
proc.start()
Module proc
is simplified with three functions exported:
setCallback
setup the callback function, so that when the message processed, my subscriber could be notified;onMsg
is invoked by the subscriber;start
setups a new worker thread to handle the messages from subscriber and make the main thread join to wait for the worker thread to exit.The full version of source code could be found on github at https://github.com/more-more-tea/python_gil. Nevertheless, it does not run as my expectation. Once the processor thread is added, the subscriber cannot receive data from the publisher in the gevent loop. If I simply drop the data processor module, the subscriber gevent loop could receive the messages from the publisher.
Is there anything wrong with the code? I suspect the GIL interferes the concurrency of the pthread in the message processor, or the gevent loop is starved. Any hints about the question or about how to debug it will be highly appreciated!
The Global Interpreter Lock, by itself, will not prevent threads from being scheduled. The Python C API does not run around injecting itself into the pthread library everywhere. This is both good and bad.
It is good because you can actually do multiple things at once in a C or C++ extension.
It is bad because you can violate the GIL rules accidentally.
The rules of the GIL are (roughly) as follows:
Py_INCREF()
and Py_DECREF()
.pthread_join()
or select()
, which means you block the whole interpreter.The formal version of these rules is specified here. Pay close attention to the "Non-Python created threads" section; it is about precisely what you are trying to do.
Reading your code, it looks like you have failed to acquire the GIL in the procThread()
function, and also failed to release it before calling pthread_join()
. There may be other problems as well, but these were the most obvious to me.
There is my solution to the question and my understanding to Python thread and pthread native ones.
Python threads, though protected by GIL, are actually system threads. The only thing makes them different is that when running, the Python thread is protected by GIL. Threads spawned by threading.Thread
are Python thread, and all the code running in those threads are protected by GIL automatically. The GIL in Python threads must be released with Py_BEGIN_ALLOW_THREADS
and Py_END_ALLOW_THREADS
if native threads co-exist with the Python threads and the Python threads are about to run a blocking statement, e.g. I/O, Thread.join, sleep, etc.
While other threads spawned outside the Python world, e.g. by pthread library, should acquire the GIL explicitly with the Python C API PyGILState_Ensure
and PyGILState_Release
when executing Python code (for pure C/C++ code, no need to acquire the Python GIL per my experience) as directed in Kevin's answer.
The updated code could be found on GitHub.
If any mis-understanding, please give me a comment. Thank you all!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With