Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Sending JSON data over WebSocket from Matlab using Python Twisted and Autobahn

I'm trying to create a connection from Matlab to stream JSON frames over a WebSocket. I've tested my python installation of autobahn and twisted using the following.

Working Example

Matlab Code

Sample driver code that uses the JSONlab toolbox to convert Matlab data to JSON form and then I compress and Base64 encode the data. Since I haven't gotten RPC to work I'm using the command-line where I need compression and Base64 encoding to avoid line-length and shell escaping issues.

clear all
close all

python = '/usr/local/bin/python'
bc = '/Users/palmerc/broadcast_client.py'
i = uint32(1)

encoder = org.apache.commons.codec.binary.Base64
while true
    tic;
    packet = rand(100, 100);
    json_packet = uint8(savejson('', packet));
    compressed = CompressLib.compress(json_packet);
    b64 = char(encoder.encode(compressed));
    message = sprintf('%s %s %s', python, bc, b64);
    status = system(message);

    i = i + 1;
    toc;
end

Broadcast Client Code

The client code has two ways of being called. You can pass your message through the command-line or create an instance of BroadcastClient and call sendMessage.

#!/usr/bin/env python

import sys
from twisted.internet import reactor
from txjsonrpc.web.jsonrpc import Proxy


class BroadcastClient():

    def __init__(self, server=None):
        self.proxy = Proxy(server)

    def errorMessage(self, value):
        print 'Error ', value

    def sendMessage(self, message):
        rc = self.proxy.callRemote('broadcastMessage', message).addCallback(lambda _: reactor.stop())
        rc.addErrback(self.errorMessage)


def main(cli_arguments):
    if len(cli_arguments) > 1:
        message = cli_arguments[1]
        broadcastClient = BroadcastClient('http://127.0.0.1:7080/')
        broadcastClient.sendMessage(message)
        reactor.run()

if __name__ == '__main__':
    main(sys.argv)

Broadcast Server Code

The server provides an RPC client on 7080, a web client on 8080, and a WebSocket on 9080 using TXJSONRPC, Twisted, and Autobahn. The Autobahn Web Client is useful for debugging and should be placed in the same directory as the server code.

#!/usr/bin/env python

import sys

from twisted.internet import reactor
from twisted.python import log
from twisted.web.server import Site
from twisted.web.static import File
from txjsonrpc.web import jsonrpc

from autobahn.twisted.websocket import WebSocketServerFactory, \
    WebSocketServerProtocol, \
    listenWS


class BroadcastServerProtocol(WebSocketServerProtocol):

    def onOpen(self):
        self.factory.registerClient(self)

    def onMessage(self, payload, isBinary):
        if not isBinary:
            message = "{} from {}".format(payload.decode('utf8'), self.peer)
            self.factory.broadcastMessage(message)

    def connectionLost(self, reason):
        WebSocketServerProtocol.connectionLost(self, reason)
        self.factory.unregisterClient(self)


class BroadcastServerFactory(WebSocketServerFactory):

    """
    Simple broadcast server broadcasting any message it receives to all
    currently connected clients.
    """

    def __init__(self, url, debug=False, debugCodePaths=False):
        WebSocketServerFactory.__init__(self, url, debug=debug, debugCodePaths=debugCodePaths)
        self.clients = []

    def registerClient(self, client):
        if client not in self.clients:
            print("registered client {}".format(client.peer))
            self.clients.append(client)

    def unregisterClient(self, client):
        if client in self.clients:
            print("unregistered client {}".format(client.peer))
            self.clients.remove(client)

    def broadcastMessage(self, message):
        print("broadcasting message '{}' ..".format(message))
        for client in self.clients:
            client.sendMessage(message.encode('utf8'))
            print("message sent to {}".format(client.peer))


class BroadcastPreparedServerFactory(BroadcastServerFactory):

    """
    Functionally same as above, but optimized broadcast using
    prepareMessage and sendPreparedMessage.
    """

    def broadcastMessage(self, message):
        print("broadcasting prepared message '{}' ..".format(message))
        preparedMessage = self.prepareMessage(message.encode('utf8'), isBinary=False)
        for client in self.clients:
            client.sendPreparedMessage(preparedMessage)
            print("prepared message sent to {}".format(client.peer))


class MatlabClient(jsonrpc.JSONRPC):

    factory = None

    def jsonrpc_broadcastMessage(self, message):
        if self.factory is not None:
            print self.factory.broadcastMessage(message)


if __name__ == '__main__':

    if len(sys.argv) > 1 and sys.argv[1] == 'debug':
        log.startLogging(sys.stdout)
        debug = True
    else:
        debug = False
    factory = BroadcastPreparedServerFactory(u"ws://127.0.0.1:9000",
                                             debug=debug,
                                             debugCodePaths=debug)

    factory.protocol = BroadcastServerProtocol
    listenWS(factory)

    matlab = MatlabClient()
    matlab.factory = factory
    reactor.listenTCP(7080, Site(matlab))

    webdir = File(".")
    web = Site(webdir)
    reactor.listenTCP(8080, web)

    reactor.run()

The Problem - Failed Attempts

First a note, If you have trouble getting python working from Matlab you need to make sure you're pointing at the correct version of Python on your system using the pyversion command and you can correct it using pyversion('/path/to/python')

Matlab can't run reactor

clear all
close all

i = uint32(1)

while true
    tic;
    packet = rand(100, 100);
    json_packet = uint8(savejson('', packet));
    compressed = CompressLib.compress(json_packet);
    b64 = char(encoder.encode(compressed));
    bc.sendMessage(py.str(b64.'));
    py.twisted.internet.reactor.run % This won't work.

    i = i + 1;
    toc;
end

Matlab POST

Another attempt involved using Matlab's webwrite to POST to the server. Turns out webwrite will convert data to JSON simply by passing the correct weboptions.

options = weboptions('MediaType', 'application/json');
data = struct('Matrix', rand(100, 100));
webwrite(server, data, options);

This worked, but turns out to be slow (~0.1 seconds) per message. I should mention that the matrix is not the real data I'm sending, the real data serializes to about 280000 bytes per message, but this provides a reasonable approximation.

How can I call bc.sendMessage so that it correctly manages to get reactor to run or solve this issue in another, faster way?

like image 252
Cameron Lowell Palmer Avatar asked Dec 18 '15 14:12

Cameron Lowell Palmer


1 Answers

Setting up a WebSocket using Python and Matlab

Check Matlab is pointing at the correct version of python

First, you need to make sure you're using the correct python binary. On Mac you might be using the system standard version instead of the one that Homebrew installed for example. Check the location of your python install using:

pyversion

You can point Matlab to the correct version using:

pyversion('path/to/python')

this may require you restart python.

As stated above I'm using Twisted to multiplex my Matlab data to the WebSocket clients. The best way I have found to solve this problem has been simply to create a server that handles POSTS and then passes that along to the WebSocket clients. Compression just slowed things down so I send 280 kBytes of JSON per request which is taking roughly 0.05 seconds per message. I would like this to be faster, .01 seconds, but this is a good start.

Matlab Code

server = 'http://127.0.0.1:7080/update.json';
headers = py.dict(pyargs('Charset','UTF-8','Content-Type','application/json'));
while true
    tic;
    packet = rand(100, 100);
    json_packet = savejson('', packet);
    r = py.requests.post(server, pyargs('data', json_packet, 'headers', headers));
    toc;
end

I could have used the Matlab webwrite function, but generally I find calling out to python to be more flexible.

Python WebSocket-WebClient Server

import sys

from twisted.internet import reactor
from twisted.python import log
from twisted.web.resource import Resource
from twisted.web.server import Site
from twisted.web.static import File

from autobahn.twisted.websocket import WebSocketServerFactory, \
    WebSocketServerProtocol, \
    listenWS


class BroadcastServerProtocol(WebSocketServerProtocol):

    def onOpen(self):
        self.factory.registerClient(self)

    def onMessage(self, payload, isBinary):
        if not isBinary:
            message = "{} from {}".format(payload.decode('utf8'), self.peer)
            self.factory.broadcastMessage(message)

    def connectionLost(self, reason):
        WebSocketServerProtocol.connectionLost(self, reason)
        self.factory.unregisterClient(self)


class BroadcastServerFactory(WebSocketServerFactory):

    def __init__(self, url, debug=False, debugCodePaths=False):
        WebSocketServerFactory.__init__(self, url, debug=debug, debugCodePaths=debugCodePaths)
        self.clients = []

    def registerClient(self, client):
        if client not in self.clients:
            print("registered client {}".format(client.peer))
            self.clients.append(client)

    def unregisterClient(self, client):
        if client in self.clients:
            print("unregistered client {}".format(client.peer))
            self.clients.remove(client)

    def broadcastMessage(self, message):
        for client in self.clients:
            client.sendMessage(message.encode('utf8'))


class BroadcastPreparedServerFactory(BroadcastServerFactory):

    def broadcastMessage(self, message, isBinary=False):
        if isBinary is True:
            message = message.encode('utf8')
        preparedMessage = self.prepareMessage(message, isBinary=isBinary)
        for client in self.clients:
            client.sendPreparedMessage(preparedMessage)


class WebClient(Resource):

    webSocket = None

    def render_POST(self, request):
        self.webSocket.broadcastMessage(request.content.read())

        return 'OK'


if __name__ == '__main__':

    if len(sys.argv) > 1 and sys.argv[1] == 'debug':
        log.startLogging(sys.stdout)
        debug = True
    else:
        debug = False
    factory = BroadcastPreparedServerFactory(u"ws://127.0.0.1:9000",
                                             debug=debug,
                                             debugCodePaths=debug)

    factory.protocol = BroadcastServerProtocol
    listenWS(factory)

    root = Resource()
    webClient = WebClient()
    webClient.webSocket = factory
    root.putChild('update.json', webClient)
    webFactory = Site(root)
    reactor.listenTCP(7080, webFactory)

    webdir = File(".")
    web = Site(webdir)
    reactor.listenTCP(8080, web)

    reactor.run()

I got rid of the RPC attempt and just went with a straight POST. Still lots of opportunity for performance improvement.

like image 104
Cameron Lowell Palmer Avatar answered Nov 02 '22 11:11

Cameron Lowell Palmer