I'm developing a multithreaded application in Python. In particular, in this application a thread should be able to generate an event that should be notified to one (or more) threads; the threads that receive the notification of the event should interrupt their execution and run a specific function. At the end of this service function, they should go back to do what they were doing before the event was generated.
In order to do something like this, I was thinking about using some kind of publish/subscribe module. I found one that is very very easy to use: PyPubSub. You can find here an extremely easy example about how to use it.
By the way, when I started to use this, I realized that it did what I was looking for, but only when you work just with processes. If you have more threads, it suspends the whole process (so, all threads in it) to run a specific routine. This is not actually the behavior I was looking for. Unfortunately, I can't change my application from multithreaded to multiprocess.
Do you know any module that can help me to do what I am trying to do in a multithreaded application? Thanks.
There is no true concurrency in python except via the multiprocessing module since the GIL is then not part of the picture.
What you want to do requires an event loop in which you check an event queue and dispatch as appropriate. Pypubsub can likely make your life easier but might be overkill for what you want (as the author of pubsub I feel comfortable saying that :) Given how seamless integration of multiple processes is provided by mp module, is there really a reason not to use it if concurrency is really what you need?
The fact that you want an event to go from any thread to one or more threads indicates you could use shared post Queue that any thread can post to, which data indicating which event type and event data. Also you would have a message Queue for each thread: threads post to the shared post Queue, the main process event loop checks the post queue and copies events to the individual thread message Queues, as appropriate. Each thread has to check its queue regularly and process, removing processed event. Each thread could subscribe to main process for specific events.
Here is an example of 3 auxiliary threads that send messages to each other:
from multiprocessing import Process, Queue, Lock
from Queue import Empty as QueueEmpty
from random import randint
def log(lock, threadId, msg):
lock.acquire()
print 'Thread', threadId, ':', msg
lock.release()
def auxThread(id, lock, sendQueue, recvQueue, genType):
## Read from the queue
log(lock, id, 'starting')
while True:
# send a message (once in a while!)
if randint(1,10) > 7:
event = dict(type = genType, fromId = id, val = randint(1, 10) )
log(lock, id, 'putting message type "%(type)s" = %(val)s' % event)
sendQueue.put(event)
# block until we get a message:
maxWait = 1 # second
try:
msg = recvQueue.get(False, maxWait)
log(lock, id, 'got message type "%(type)s" = %(val)s from thread %(fromId)s' % msg)
if (msg['val'] == 'DONE'):
break
except QueueEmpty:
pass
log(lock, id, 'done')
def createThread(id, lock, postOffice, genType):
messagesForAux = Queue()
args = (id, lock, postOffice, messagesForAux, genType)
auxProc = Process(target=auxThread, args=args)
auxProc.daemon = True
return dict(q=messagesForAux, p=auxProc, id=id)
def mainThread():
postOffice = Queue() # where all threads post their messages
lock = Lock() # so print can be synchronized
# setup threads:
msgThreads = [
createThread(1, lock, postOffice, 'heartbeat'),
createThread(2, lock, postOffice, 'new_socket'),
createThread(3, lock, postOffice, 'keypress'),
]
# identify which threads listen for which messages
dispatch = dict(
heartbeat = (2,),
keypress = (1,),
new_socket = (3,),
)
# start all threads
for th in msgThreads:
th['p'].start()
# process messages
count = 0
while True:
try:
maxWait = 1 # second
msg = postOffice.get(False, maxWait)
for threadId in dispatch[msg['type']]:
thObj = msgThreads[threadId - 1]
thObj['q'].put(msg)
count += 1
if count > 20:
break
except QueueEmpty:
pass
log(lock, 0, "Main thread sending exit signal to aux threads")
for th in msgThreads:
th['q'].put(dict(type='command', val='DONE', fromId=0))
for th in msgThreads:
th['p'].join()
log(lock, th['id'], 'joined main')
log(lock, 0, "DONE")
if __name__ == '__main__':
mainThread()
You are entirely right that this description shares similarities with pypubsub functionality but you would be using only a small part of pypubsub, I think most of the complexity in your endeavor is the two types of queues, pypubsub wont help much for that pat of the problem. Once you have the queue system working using mp module (as per my example), you can bring in pypubsub and post/queue its messages rather than your own implantation of an event.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With