I am trying to build a simple chat application using AMQP, Websockets and Ruby. I understand that this may not be the best use-case to understand AMQP but I would like to understand where i am going wrong.
The following is my amqp-server code
require 'rubygems'
require 'amqp'
require 'mongo'
require 'em-websocket'
require 'json'
class MessageParser
# message format => "room:harry_potter, nickname:siddharth, room:members"
def self.parse(message)
parsed_message = JSON.parse(message)
response = {}
if parsed_message['status'] == 'status'
response[:status] = 'STATUS'
response[:username] = parsed_message['username']
response[:roomname] = parsed_message['roomname']
elsif parsed_message['status'] == 'message'
response[:status] = 'MESSAGE'
response[:message] = parsed_message['message']
response[:roomname] = parsed_message['roomname'].split().join('_')
end
response
end
end
class MongoManager
def self.establish_connection(database)
@db ||= Mongo::Connection.new('localhost', 27017).db(database)
@db.collection('rooms')
@db
end
end
@sockets = []
EventMachine.run do
connection = AMQP.connect(:host => '127.0.0.1')
channel = AMQP::Channel.new(connection)
puts "Connected to AMQP broker. #{AMQP::VERSION} "
mongo = MongoManager.establish_connection("trackertalk_development")
EventMachine::WebSocket.start(:host => '127.0.0.1', :port => 8080) do |ws|
socket_detail = {:socket => ws}
ws.onopen do
@sockets << socket_detail
end
ws.onmessage do |message|
status = MessageParser.parse(message)
exchange = channel.fanout(status[:roomname].split().join('_'))
if status[:status] == 'STATUS'
queue = channel.queue(status[:username], :durable => true)
unless queue.subscribed?
puts "--------- SUBSCRIBED --------------"
queue.bind(exchange).subscribe do |payload|
puts "PAYLOAD : #{payload}"
ws.send(payload)
end
else
puts "----ALREADY SUBSCRIBED"
end
# only after 0.8.0rc14
#queue = channel.queue(status[:username], :durable => true)
#AMQP::Consumer.new(channel, queue)
elsif status[:status] == 'MESSAGE'
puts "********************* Message- published ******************************"
exchange.publish(status[:message)
end
end
ws.onclose do
@sockets.delete ws
end
end
end
I use the status to indicate whether the incoming message is a message for ongoing chat or for a status message requiring me to handle chores like subscribing to the queue.
The problem i face is that when I send a message like
socket.send(JSON.stringify({status:'message', message:'test', roomname:'Harry Potter'}))
The exchange.publish' is called but it still doesn't get pushed via the
ws.send` to the browser.
Is there something fundamentally wrong with my understanding of EventMachine and AMQP?
Here is the pastie for the same code http://pastie.org/private/xosgb8tw1w5vuroa4w7a
My code seems to work as desired when i remove the durable => true
from queue = channel.queue(status[:username], :durable => true)
The following is a snippet of my Rails view which identifies the user's username and the roomname and sends it as part of message via Websockets.
Though the code seems to work when i remove the durable => true
I fail to understand why that affects the message being delivered. Please Ignore the mongo part of as it does not play any part yet.
I would also like to know if my approach to AMQP and its usage is correct
<script>
$(document).ready(function(){
var username = '<%= @user.email %>';
var roomname = 'Bazingaa';
socket = new WebSocket('ws://127.0.0.1:8080/');
socket.onopen = function(msg){
console.log('connected');
socket.send(JSON.stringify({status:'status', username:username, roomname:roomname}));
}
socket.onmessage = function(msg){
$('#chat-log').append(msg.data);
}
});
</script>
<div class='block'>
<div class='content'>
<h2 class='title'><%= @room.name %></h2>
<div class='inner'>
<div id="chat-log">
</div>
<div id="chat-console">
<textarea rows="5" cols="40"></textarea>
</div>
</div>
</div>
</div>
<style>
#chat-log{
color:#000;
font-weight:bold;
margin-top:1em;
width:900px;
overflow:auto;
height:300px;
}
#chat-console{
bottom:10px;
}
textarea{
width:100%;
height:60px;
}
</style>
I think your problem might be the queue hangs around on the broker between invocations of ws.onmessage. When the client reconnects the queue and binding already exists so ws.send() doesn't get called.
By default when you create a queue, it and any bindings it has, hangs around until the broker restarts, or you explicitly tell the broker to delete it.
There are two ways to change this:
If you have control over the broker you are using the rabbitmq broker, an easy way to introspect what is happening on the broker is to install the management plugin, which provides a web interface to exchanges, bindings and queues on the broker.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With