Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Sending RabbitMQ messages via websockets

Looking for some code samples to solve this problem :-

Would like to write some code (Python or Javascript) that would act as a subscriber to a RabbitMQ queue so that on receiving a message it would broadcast the message via websockets to any connected client.

I've looked at Autobahn and node.js (using "amqp" and "ws" ) but cannot get things to work as needed. Here's the server code in javascript using node.js:-

var amqp = require('amqp');
var WebSocketServer = require('ws').Server

var connection = amqp.createConnection({host: 'localhost'});
var wss = new WebSocketServer({port:8000});

wss.on('connection',function(ws){

    ws.on('open', function() {
        console.log('connected');
        ws.send(Date.now().toString());
    });

    ws.on('message',function(message){
            console.log('Received: %s',message);
            ws.send(Date.now().toString());
    });
});

connection.on('ready', function(){
    connection.queue('MYQUEUE', {durable:true,autoDelete:false},function(queue){
            console.log(' [*] Waiting for messages. To exit press CTRL+C')
            queue.subscribe(function(msg){
                    console.log(" [x] Received from MYQUEUE %s",msg.data.toString('utf-8'));
                    payload = msg.data.toString('utf-8');
                    // HOW DOES THIS NOW GET SENT VIA WEBSOCKETS ??
            });
    });
});

Using this code, I can successfully subscribe to a queue in Rabbit and receive any messages that are sent to the queue. Similarly, I can connect a websocket client (e.g. a browser) to the server and send/receive messages. BUT ... how can I send the payload of the Rabbit queue message as a websocket message at the point indicated ("HOW DOES THIS NOW GET SENT VIA WEBSOCKETS") ? I think it's something to do with being stuck in the wrong callback or they need to be nested somehow ...?

Alternatively, if this can be done easier in Python (via Autobahn and pika) that would be great.

Thanks !

like image 389
bzo Avatar asked Apr 04 '14 12:04

bzo


People also ask

Does RabbitMQ support WebSockets?

The plugin supports WebSockets with TLS (WSS) connections. See TLS guide to learn more about TLS support in RabbitMQ. The TLS listener port, server certificate file, private key and CA certificate bundle are mandatory options. Password is also mandatory if the private key uses one.

Does Amqp use WebSocket?

The AMQP WebSockets binding creates a tunnel over TCP port 443 that is then equivalent to AMQP 5671 connections. After setting up the connection and TLS, Service Bus offers two SASL mechanism options: SASL PLAIN is commonly used for passing username and password credentials to a server.

Can we send file through WebSocket?

You can send raw binary data through the WebSocket. It's quite easy to manage. One option is to prepend a "magic byte" (an identifier that marks the message as non-JSON). For example, prepend binary messages with the B character.


1 Answers

One way to implement your system is use python with tornado.

Here the server:

    import tornado.ioloop
    import tornado.web
    import tornado.websocket
    import os
    import pika
    from threading import Thread


    clients = []

    def threaded_rmq():
        connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"));
        print 'Connected:localhost'
        channel = connection.channel()
        channel.queue_declare(queue="my_queue")
        print 'Consumer ready, on my_queue'
        channel.basic_consume(consumer_callback, queue="my_queue", no_ack=True) 
        channel.start_consuming()


    def consumer_callback(ch, method, properties, body):
            print " [x] Received %r" % (body,)
            for itm in clients:
                itm.write_message(body)

    class SocketHandler(tornado.websocket.WebSocketHandler):
        def open(self):
            print "WebSocket opened"
            clients.append(self)
        def on_message(self, message):
            self.write_message(u"You said: " + message)

        def on_close(self):
            print "WebSocket closed"
            clients.remove(self)


    class MainHandler(tornado.web.RequestHandler):
        def get(self):
            print "get page"
            self.render("websocket.html")


application = tornado.web.Application([
    (r'/ws', SocketHandler),
    (r"/", MainHandler),
])

if __name__ == "__main__":
    thread = Thread(target = threaded_rmq)
    thread.start()

    application.listen(8889)
    tornado.ioloop.IOLoop.instance().start()

and here the html page:

<html>
<head>
    <script src="//code.jquery.com/jquery-1.11.0.min.js"></script>
    <script>

    $(document).ready(function() {
      var ws;
       if ('WebSocket' in window) {
           ws = new WebSocket('ws://localhost:8889/ws');
        }
        else if ('MozWebSocket' in window) {
            ws = new MozWebSocket('ws://localhost:8889/ws');
        }
        else {

              alert("<tr><td> your browser doesn't support web socket </td></tr>");

            return;
        }

      ws.onopen = function(evt) { alert("Connection open ...")};

      ws.onmessage = function(evt){
        alert(evt.data);
      };

      function closeConnect(){
          ws.close();
      }


  });
    </script>

</head>

<html>

So when you publish a message to "my_queue" the message is redirects to all web page connected.

I hope it can be useful

EDIT**

Here https://github.com/Gsantomaggio/rabbitmqexample you can find the complete example

like image 83
Gabriele Santomaggio Avatar answered Oct 12 '22 20:10

Gabriele Santomaggio