Maybe I'm missing something here in the asynchronous designs of Twisted, but I can't seem to find a way to call the sendMessage() method "externaly". By this I mean, sending messages without being solely at the callback methods of Twisted/AutobahnWebsockets (like at onOpen or when receiving data from server at onMessage())
Of course I could launch a thread and call my_protocol_instance.sendMessage("hello") but that would defeat every purpose of the asynchronous design right?
In a concrete example, I need to have a top wrapper class which opens the connection and manages it, and whenever I need I call my_class.send_my_toplevel_message(msg). How can I implement this?
Hope I've been clear on my explanation.
Thanks
Why do you need a thread to launch protocolInstance.sendMessage() ? This can be done in a normal reactor loop.
The core of a twisted is reactor and it gives a much easier look at things when you consider twisted itself reactive - meaning it does something as a reaction (response) to something else.
Now I assume that the thread you are talking about, also gets created and made in calling sendMessage because of certain events or activity or status. I can hardly imagine a case where you would just need to send a message out of the blue without any reason to react.
If however there is an event which should trigger sendMessage, there is no need to invoke that in thread: just use twisted mechanisms for catching that event and then calling sendMessage from that particular event's callback.
Now on to your concrete example: can you specify what "whenever I need" means exactly in the context of this question? An input from another connection? An input from the user? Looping activity?
I managed to implement what I needed by running Twisted in another thread, keeping my program free to run and allowing it to trigger send data in Twisted with reactor.callFromThread().
What do you think?
# ----- twisted ----------
class _WebSocketClientProtocol(WebSocketClientProtocol):
def __init__(self, factory):
self.factory = factory
def onOpen(self):
log.debug("Client connected")
self.factory.protocol_instance = self
self.factory.base_client._connected_event.set()
class _WebSocketClientFactory(WebSocketClientFactory):
def __init__(self, *args, **kwargs):
WebSocketClientFactory.__init__(self, *args, **kwargs)
self.protocol_instance = None
self.base_client = None
def buildProtocol(self, addr):
return _WebSocketClientProtocol(self)
# ------ end twisted -------
class BaseWBClient(object):
def __init__(self, websocket_settings):
self.settings = websocket_settings
# instance to be set by the own factory
self.factory = None
# this event will be triggered on onOpen()
self._connected_event = threading.Event()
# queue to hold not yet dispatched messages
self._send_queue = Queue.Queue()
self._reactor_thread = None
def connect(self):
log.debug("Connecting to %(host)s:%(port)d" % self.settings)
self.factory = _WebSocketClientFactory(
"ws://%(host)s:%(port)d" % self.settings,
debug=True)
self.factory.base_client = self
c = connectWS(self.factory)
self._reactor_thread = threading.Thread(target=reactor.run,
args=(False,))
self._reactor_thread.daemon = True
self._reactor_thread.start()
def send_message(self, body):
if not self._check_connection():
return
log.debug("Queing send")
self._send_queue.put(body)
reactor.callFromThread(self._dispatch)
def _check_connection(self):
if not self._connected_event.wait(timeout=10):
log.error("Unable to connect to server")
self.close()
return False
return True
def _dispatch(self):
log.debug("Dispatching")
while True:
try:
body = self._send_queue.get(block=False)
except Queue.Empty:
break
self.factory.protocol_instance.sendMessage(body)
def close(self):
reactor.callFromThread(reactor.stop)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With