Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python: threads managing events notified by other threads

I'm developing a multithreaded application in Python. In particular, in this application a thread should be able to generate an event that should be notified to one (or more) threads; the threads that receive the notification of the event should interrupt their execution and run a specific function. At the end of this service function, they should go back to do what they were doing before the event was generated.

In order to do something like this, I was thinking about using some kind of publish/subscribe module. I found one that is very very easy to use: PyPubSub. You can find here an extremely easy example about how to use it.

By the way, when I started to use this, I realized that it did what I was looking for, but only when you work just with processes. If you have more threads, it suspends the whole process (so, all threads in it) to run a specific routine. This is not actually the behavior I was looking for. Unfortunately, I can't change my application from multithreaded to multiprocess.

Do you know any module that can help me to do what I am trying to do in a multithreaded application? Thanks.

like image 955
Cell Avatar asked Oct 20 '22 14:10

Cell


1 Answers

There is no true concurrency in python except via the multiprocessing module since the GIL is then not part of the picture.

What you want to do requires an event loop in which you check an event queue and dispatch as appropriate. Pypubsub can likely make your life easier but might be overkill for what you want (as the author of pubsub I feel comfortable saying that :) Given how seamless integration of multiple processes is provided by mp module, is there really a reason not to use it if concurrency is really what you need?

The fact that you want an event to go from any thread to one or more threads indicates you could use shared post Queue that any thread can post to, which data indicating which event type and event data. Also you would have a message Queue for each thread: threads post to the shared post Queue, the main process event loop checks the post queue and copies events to the individual thread message Queues, as appropriate. Each thread has to check its queue regularly and process, removing processed event. Each thread could subscribe to main process for specific events.

Here is an example of 3 auxiliary threads that send messages to each other:

from multiprocessing import Process, Queue, Lock
from Queue import Empty as QueueEmpty
from random import randint


def log(lock, threadId, msg):
    lock.acquire()
    print 'Thread', threadId, ':', msg
    lock.release()


def auxThread(id, lock, sendQueue, recvQueue, genType):
    ## Read from the queue
    log(lock, id, 'starting')
    while True:
        # send a message (once in a while!)
        if randint(1,10) > 7:
            event = dict(type = genType, fromId = id, val = randint(1, 10) )
            log(lock, id, 'putting message type "%(type)s" = %(val)s' % event)
            sendQueue.put(event)

        # block until we get a message:
        maxWait = 1 # second
        try:
            msg = recvQueue.get(False, maxWait)
            log(lock, id, 'got message type "%(type)s" = %(val)s from thread %(fromId)s' % msg)
            if (msg['val'] == 'DONE'):
                break
        except QueueEmpty:
            pass

    log(lock, id, 'done')


def createThread(id, lock, postOffice, genType):
    messagesForAux = Queue()
    args = (id, lock, postOffice, messagesForAux, genType)
    auxProc = Process(target=auxThread, args=args)
    auxProc.daemon = True
    return dict(q=messagesForAux, p=auxProc, id=id)


def mainThread():
    postOffice = Queue()   # where all threads post their messages
    lock = Lock() # so print can be synchronized

    # setup threads:
    msgThreads = [
        createThread(1, lock, postOffice, 'heartbeat'),
        createThread(2, lock, postOffice, 'new_socket'),
        createThread(3, lock, postOffice, 'keypress'),
    ]

    # identify which threads listen for which messages
    dispatch = dict(
        heartbeat  = (2,),
        keypress   = (1,),
        new_socket = (3,),
    )

    # start all threads
    for th in msgThreads:
        th['p'].start()

    # process messages
    count = 0
    while True:
        try:
            maxWait = 1 # second
            msg = postOffice.get(False, maxWait)
            for threadId in dispatch[msg['type']]:
                thObj = msgThreads[threadId - 1]
                thObj['q'].put(msg)
            count += 1
            if count > 20:
                break

        except QueueEmpty:
            pass

    log(lock, 0, "Main thread sending exit signal to aux threads")
    for th in msgThreads:
        th['q'].put(dict(type='command', val='DONE', fromId=0))

    for th in msgThreads:
        th['p'].join()
        log(lock, th['id'], 'joined main')
    log(lock, 0, "DONE")


if __name__ == '__main__':
    mainThread()

You are entirely right that this description shares similarities with pypubsub functionality but you would be using only a small part of pypubsub, I think most of the complexity in your endeavor is the two types of queues, pypubsub wont help much for that pat of the problem. Once you have the queue system working using mp module (as per my example), you can bring in pypubsub and post/queue its messages rather than your own implantation of an event.

like image 108
Oliver Avatar answered Oct 23 '22 11:10

Oliver