Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

AMQP creating subscribing to queues dynamically

I am trying to build a simple chat application using AMQP, Websockets and Ruby. I understand that this may not be the best use-case to understand AMQP but I would like to understand where i am going wrong.

The following is my amqp-server code

require 'rubygems'
require 'amqp'
require 'mongo'
require 'em-websocket'
require 'json'

class MessageParser
  # message format => "room:harry_potter, nickname:siddharth, room:members"
  def self.parse(message)
    parsed_message = JSON.parse(message)

    response = {}
    if parsed_message['status'] == 'status'
      response[:status] = 'STATUS'
      response[:username] = parsed_message['username']
      response[:roomname] = parsed_message['roomname']
    elsif parsed_message['status'] == 'message'
      response[:status]   = 'MESSAGE'
      response[:message]  = parsed_message['message']
      response[:roomname] = parsed_message['roomname'].split().join('_')
    end

    response
  end
end

class MongoManager
  def self.establish_connection(database)
    @db ||= Mongo::Connection.new('localhost', 27017).db(database)
    @db.collection('rooms')

    @db
  end  
end


@sockets = []
EventMachine.run do
  connection = AMQP.connect(:host => '127.0.0.1')
  channel = AMQP::Channel.new(connection)

  puts "Connected to AMQP broker. #{AMQP::VERSION} "

  mongo = MongoManager.establish_connection("trackertalk_development")

  EventMachine::WebSocket.start(:host => '127.0.0.1', :port => 8080) do |ws|
    socket_detail = {:socket => ws}
    ws.onopen do 
      @sockets << socket_detail

    end

    ws.onmessage do |message|

      status  = MessageParser.parse(message)         
      exchange = channel.fanout(status[:roomname].split().join('_'))   

      if status[:status] == 'STATUS'               
         queue = channel.queue(status[:username], :durable => true)

        unless queue.subscribed? 
         puts "--------- SUBSCRIBED --------------"
         queue.bind(exchange).subscribe do |payload|
            puts "PAYLOAD :  #{payload}"
            ws.send(payload)
          end 
        else
          puts "----ALREADY SUBSCRIBED"
        end                  

        # only after 0.8.0rc14
        #queue = channel.queue(status[:username], :durable => true)      
        #AMQP::Consumer.new(channel, queue)        

      elsif status[:status] == 'MESSAGE'
        puts "********************* Message- published ******************************"
        exchange.publish(status[:message)  
      end                  
    end

    ws.onclose do 
      @sockets.delete ws
    end
  end    
end

I use the status to indicate whether the incoming message is a message for ongoing chat or for a status message requiring me to handle chores like subscribing to the queue.

The problem i face is that when I send a message like socket.send(JSON.stringify({status:'message', message:'test', roomname:'Harry Potter'}))

The exchange.publish' is called but it still doesn't get pushed via thews.send` to the browser.

Is there something fundamentally wrong with my understanding of EventMachine and AMQP?

Here is the pastie for the same code http://pastie.org/private/xosgb8tw1w5vuroa4w7a

My code seems to work as desired when i remove the durable => true from queue = channel.queue(status[:username], :durable => true)

The following is a snippet of my Rails view which identifies the user's username and the roomname and sends it as part of message via Websockets.

Though the code seems to work when i remove the durable => true I fail to understand why that affects the message being delivered. Please Ignore the mongo part of as it does not play any part yet.

I would also like to know if my approach to AMQP and its usage is correct

<script>
    $(document).ready(function(){
        var username = '<%= @user.email %>';
        var roomname = 'Bazingaa';

        socket = new WebSocket('ws://127.0.0.1:8080/');

        socket.onopen = function(msg){
            console.log('connected');
            socket.send(JSON.stringify({status:'status', username:username, roomname:roomname}));
        }

        socket.onmessage = function(msg){
            $('#chat-log').append(msg.data);

        }

    });

</script>
<div class='block'>
  <div class='content'>
    <h2 class='title'><%= @room.name %></h2>
    <div class='inner'>
      <div id="chat-log">
      </div>

      <div id="chat-console">
        <textarea rows="5" cols="40"></textarea>
      </div>
    </div>
  </div>
</div>

<style>
    #chat-log{
        color:#000;
        font-weight:bold;
        margin-top:1em;
        width:900px;
        overflow:auto;
        height:300px;
    }
    #chat-console{
        bottom:10px;
    }

    textarea{
        width:100%;
        height:60px;
    }
</style>
like image 785
Sid Avatar asked Jul 19 '11 12:07

Sid


1 Answers

I think your problem might be the queue hangs around on the broker between invocations of ws.onmessage. When the client reconnects the queue and binding already exists so ws.send() doesn't get called.

By default when you create a queue, it and any bindings it has, hangs around until the broker restarts, or you explicitly tell the broker to delete it.

There are two ways to change this:

  • Adding the durable flag when you create the queue, which will cause the queue to stick around even if the broker restarts
  • Adding the auto_delete flag, which will cause the broker to automatically delete the entity after a short amount of time of not being having a consumer attached to it

If you have control over the broker you are using the rabbitmq broker, an easy way to introspect what is happening on the broker is to install the management plugin, which provides a web interface to exchanges, bindings and queues on the broker.

like image 72
alanxz Avatar answered Nov 09 '22 23:11

alanxz