Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pattern for a background Twisted server that fills an incoming message queue and empties an outgoing message queue?

I'd like to do something like this:

twistedServer.start() # This would be a nonblocking call

while True:
   while twistedServer.haveMessage():
      message = twistedServer.getMessage()
      response = handleMessage(message)
      twistedServer.sendResponse(response)
   doSomeOtherLogic()

The key thing I want to do is run the server in a background thread. I'm hoping to do this with a thread instead of through multiprocessing/queue because I already have one layer of messaging for my app and I'd like to avoid two. I'm bringing this up because I can already see how to do this in a separate process, but what I'd like to know is how to do it in a thread, or if I can. Or if perhaps there is some other pattern I can use that accomplishes this same thing, like perhaps writing my own reactor.run method. Thanks for any help. :)

like image 777
shino Avatar asked Nov 12 '10 03:11

shino


1 Answers

The key thing I want to do is run the server in a background thread.

You don't explain why this is key, though. Generally, things like "use threads" are implementation details. Perhaps threads are appropriate, perhaps not, but the actual goal is agnostic on the point. What is your goal? To handle multiple clients concurrently? To handle messages of this sort simultaneously with events from another source (for example, a web server)? Without knowing the ultimate goal, there's no way to know if an implementation strategy I suggest will work or not.

With that in mind, here are two possibilities.

First, you could forget about threads. This would entail defining your event handling logic above as only the event handling parts. The part that tries to get an event would be delegated to another part of the application, probably something ultimately based on one of the reactor APIs (for example, you might set up a TCP server which accepts messages and turns them into the events you're processing, in which case you would start off with a call to reactor.listenTCP of some sort).

So your example might turn into something like this (with some added specificity to try to increase the instructive value):

from twisted.internet import reactor

class MessageReverser(object):
    """
    Accept messages, reverse them, and send them onwards.
    """
    def __init__(self, server):
        self.server = server

    def messageReceived(self, message):
        """
        Callback invoked whenever a message is received.  This implementation
        will reverse and re-send the message.
        """
        self.server.sendMessage(message[::-1])
        doSomeOtherLogic()

def main():
    twistedServer = ...
    twistedServer.start(MessageReverser(twistedServer))
    reactor.run()

main()

Several points to note about this example:

  • I'm not sure how your twistedServer is defined. I'm imagining that it interfaces with the network in some way. Your version of the code would have had it receiving messages and buffering them until they were removed from the buffer by your loop for processing. This version would probably have no buffer, but instead just call the messageReceived method of the object passed to start as soon as a message arrives. You could still add buffering of some sort if you want, by putting it into the messageReceived method.

  • There is now a call to reactor.run which will block. You might instead write this code as a twistd plugin or a .tac file, in which case you wouldn't be directly responsible for starting the reactor. However, someone must start the reactor, or most APIs from Twisted won't do anything. reactor.run blocks, of course, until someone calls reactor.stop.

  • There are no threads used by this approach. Twisted's cooperative multitasking approach to concurrency means you can still do multiple things at once, as long as you're mindful to cooperate (which usually means returning to the reactor once in a while).

  • The exact times the doSomeOtherLogic function is called is changed slightly, because there's no notion of "the buffer is empty for now" separate from "I just handled a message". You could change this so that the function is installed called once a second, or after every N messages, or whatever is appropriate.

The second possibility would be to really use threads. This might look very similar to the previous example, but you would call reactor.run in another thread, rather than the main thread. For example,

from Queue import Queue
from threading import Thread

class MessageQueuer(object):
    def __init__(self, queue):
        self.queue = queue

    def messageReceived(self, message):
        self.queue.put(message)

def main():
    queue = Queue()
    twistedServer = ...
    twistedServer.start(MessageQueuer(queue))
    Thread(target=reactor.run, args=(False,)).start()

    while True:
        message = queue.get()
        response = handleMessage(message)
        reactor.callFromThread(twistedServer.sendResponse, response)

main()

This version assumes a twistedServer which works similarly, but uses a thread to let you have the while True: loop. Note:

  • You must invoke reactor.run(False) if you use a thread, to prevent Twisted from trying to install any signal handlers, which Python only allows to be installed in the main thread. This means the Ctrl-C handling will be disabled and reactor.spawnProcess won't work reliably.

  • MessageQueuer has the same interface as MessageReverser, only its implementation of messageReceived is different. It uses the threadsafe Queue object to communicate between the reactor thread (in which it will be called) and your main thread where the while True: loop is running.

  • You must use reactor.callFromThread to send the message back to the reactor thread (assuming twistedServer.sendResponse is actually based on Twisted APIs). Twisted APIs are typically not threadsafe and must be called in the reactor thread. This is what reactor.callFromThread does for you.

  • You'll want to implement some way to stop the loop and the reactor, one supposes. The python process won't exit cleanly until after you call reactor.stop.

Note that while the threaded version gives you the familiar, desired while True loop, it doesn't actually do anything much better than the non-threaded version. It's just more complicated. So, consider whether you actually need threads, or if they're merely an implementation technique that can be exchanged for something else.

like image 52
Jean-Paul Calderone Avatar answered Sep 21 '22 06:09

Jean-Paul Calderone