Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Writing an "interactive" client with Twisted/Autobahn Websockets

Maybe I'm missing something here in the asynchronous designs of Twisted, but I can't seem to find a way to call the sendMessage() method "externaly". By this I mean, sending messages without being solely at the callback methods of Twisted/AutobahnWebsockets (like at onOpen or when receiving data from server at onMessage())

Of course I could launch a thread and call my_protocol_instance.sendMessage("hello") but that would defeat every purpose of the asynchronous design right?

In a concrete example, I need to have a top wrapper class which opens the connection and manages it, and whenever I need I call my_class.send_my_toplevel_message(msg). How can I implement this?

Hope I've been clear on my explanation.

Thanks

like image 270
abranches Avatar asked Sep 19 '13 15:09

abranches


2 Answers

Why do you need a thread to launch protocolInstance.sendMessage() ? This can be done in a normal reactor loop.

The core of a twisted is reactor and it gives a much easier look at things when you consider twisted itself reactive - meaning it does something as a reaction (response) to something else.

Now I assume that the thread you are talking about, also gets created and made in calling sendMessage because of certain events or activity or status. I can hardly imagine a case where you would just need to send a message out of the blue without any reason to react.

If however there is an event which should trigger sendMessage, there is no need to invoke that in thread: just use twisted mechanisms for catching that event and then calling sendMessage from that particular event's callback.

Now on to your concrete example: can you specify what "whenever I need" means exactly in the context of this question? An input from another connection? An input from the user? Looping activity?

like image 173
jbreicis Avatar answered Oct 15 '22 21:10

jbreicis


I managed to implement what I needed by running Twisted in another thread, keeping my program free to run and allowing it to trigger send data in Twisted with reactor.callFromThread().

What do you think?

# ----- twisted ----------
class _WebSocketClientProtocol(WebSocketClientProtocol):
    def __init__(self, factory):
        self.factory = factory

    def onOpen(self):
        log.debug("Client connected")
        self.factory.protocol_instance = self
        self.factory.base_client._connected_event.set()

class _WebSocketClientFactory(WebSocketClientFactory):
    def __init__(self, *args, **kwargs):
        WebSocketClientFactory.__init__(self, *args, **kwargs)
        self.protocol_instance = None
        self.base_client = None

    def buildProtocol(self, addr):
        return _WebSocketClientProtocol(self)
# ------ end twisted -------

class BaseWBClient(object):

    def __init__(self, websocket_settings):
        self.settings = websocket_settings
        # instance to be set by the own factory
        self.factory = None
        # this event will be triggered on onOpen()
        self._connected_event = threading.Event()
        # queue to hold not yet dispatched messages
        self._send_queue = Queue.Queue()
        self._reactor_thread = None

    def connect(self):
        log.debug("Connecting to %(host)s:%(port)d" % self.settings)
        self.factory = _WebSocketClientFactory(
                                "ws://%(host)s:%(port)d" % self.settings,
                                debug=True)
        self.factory.base_client = self
        c = connectWS(self.factory)
        self._reactor_thread = threading.Thread(target=reactor.run,
                                               args=(False,))
        self._reactor_thread.daemon = True
        self._reactor_thread.start()

    def send_message(self, body):
        if not self._check_connection():
            return
        log.debug("Queing send")
        self._send_queue.put(body)
        reactor.callFromThread(self._dispatch)

    def _check_connection(self):
        if not self._connected_event.wait(timeout=10):
            log.error("Unable to connect to server")
            self.close()
            return False
        return True

    def _dispatch(self):
        log.debug("Dispatching")
        while True:
            try:
                body = self._send_queue.get(block=False)
            except Queue.Empty:
                break
            self.factory.protocol_instance.sendMessage(body)

    def close(self):
        reactor.callFromThread(reactor.stop)
like image 23
abranches Avatar answered Oct 15 '22 19:10

abranches