Looking for some code samples to solve this problem :-
Would like to write some code (Python or Javascript) that would act as a subscriber to a RabbitMQ queue so that on receiving a message it would broadcast the message via websockets to any connected client.
I've looked at Autobahn and node.js (using "amqp" and "ws" ) but cannot get things to work as needed. Here's the server code in javascript using node.js:-
var amqp = require('amqp');
var WebSocketServer = require('ws').Server
var connection = amqp.createConnection({host: 'localhost'});
var wss = new WebSocketServer({port:8000});
wss.on('connection',function(ws){
ws.on('open', function() {
console.log('connected');
ws.send(Date.now().toString());
});
ws.on('message',function(message){
console.log('Received: %s',message);
ws.send(Date.now().toString());
});
});
connection.on('ready', function(){
connection.queue('MYQUEUE', {durable:true,autoDelete:false},function(queue){
console.log(' [*] Waiting for messages. To exit press CTRL+C')
queue.subscribe(function(msg){
console.log(" [x] Received from MYQUEUE %s",msg.data.toString('utf-8'));
payload = msg.data.toString('utf-8');
// HOW DOES THIS NOW GET SENT VIA WEBSOCKETS ??
});
});
});
Using this code, I can successfully subscribe to a queue in Rabbit and receive any messages that are sent to the queue. Similarly, I can connect a websocket client (e.g. a browser) to the server and send/receive messages. BUT ... how can I send the payload of the Rabbit queue message as a websocket message at the point indicated ("HOW DOES THIS NOW GET SENT VIA WEBSOCKETS") ? I think it's something to do with being stuck in the wrong callback or they need to be nested somehow ...?
Alternatively, if this can be done easier in Python (via Autobahn and pika) that would be great.
Thanks !
The plugin supports WebSockets with TLS (WSS) connections. See TLS guide to learn more about TLS support in RabbitMQ. The TLS listener port, server certificate file, private key and CA certificate bundle are mandatory options. Password is also mandatory if the private key uses one.
The AMQP WebSockets binding creates a tunnel over TCP port 443 that is then equivalent to AMQP 5671 connections. After setting up the connection and TLS, Service Bus offers two SASL mechanism options: SASL PLAIN is commonly used for passing username and password credentials to a server.
You can send raw binary data through the WebSocket. It's quite easy to manage. One option is to prepend a "magic byte" (an identifier that marks the message as non-JSON). For example, prepend binary messages with the B character.
One way to implement your system is use python with tornado.
Here the server:
import tornado.ioloop
import tornado.web
import tornado.websocket
import os
import pika
from threading import Thread
clients = []
def threaded_rmq():
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"));
print 'Connected:localhost'
channel = connection.channel()
channel.queue_declare(queue="my_queue")
print 'Consumer ready, on my_queue'
channel.basic_consume(consumer_callback, queue="my_queue", no_ack=True)
channel.start_consuming()
def consumer_callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
for itm in clients:
itm.write_message(body)
class SocketHandler(tornado.websocket.WebSocketHandler):
def open(self):
print "WebSocket opened"
clients.append(self)
def on_message(self, message):
self.write_message(u"You said: " + message)
def on_close(self):
print "WebSocket closed"
clients.remove(self)
class MainHandler(tornado.web.RequestHandler):
def get(self):
print "get page"
self.render("websocket.html")
application = tornado.web.Application([
(r'/ws', SocketHandler),
(r"/", MainHandler),
])
if __name__ == "__main__":
thread = Thread(target = threaded_rmq)
thread.start()
application.listen(8889)
tornado.ioloop.IOLoop.instance().start()
and here the html page:
<html>
<head>
<script src="//code.jquery.com/jquery-1.11.0.min.js"></script>
<script>
$(document).ready(function() {
var ws;
if ('WebSocket' in window) {
ws = new WebSocket('ws://localhost:8889/ws');
}
else if ('MozWebSocket' in window) {
ws = new MozWebSocket('ws://localhost:8889/ws');
}
else {
alert("<tr><td> your browser doesn't support web socket </td></tr>");
return;
}
ws.onopen = function(evt) { alert("Connection open ...")};
ws.onmessage = function(evt){
alert(evt.data);
};
function closeConnect(){
ws.close();
}
});
</script>
</head>
<html>
So when you publish a message to "my_queue" the message is redirects to all web page connected.
I hope it can be useful
EDIT**
Here https://github.com/Gsantomaggio/rabbitmqexample you can find the complete example
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With