Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can I use a Request / Reply - RPC pattern in Rails 3 with AMQP?

For reasons similar to the ones in this discussion, I'm experimenting with messaging in lieu of REST for a synchronous RPC call from one Rails 3 application to another. Both apps are running on thin.

The "server" application has a config/initializers/amqp.rb file based on the Request / Reply pattern in the rubyamqp.info documentation:

require "amqp"

EventMachine.next_tick do
  connection = AMQP.connect ENV['CLOUDAMQP_URL'] || 'amqp://guest:guest@localhost'
  channel    = AMQP::Channel.new(connection)

  requests_queue = channel.queue("amqpgem.examples.services.time", :exclusive => true, :auto_delete => true)
  requests_queue.subscribe(:ack => true) do |metadata, payload|
    puts "[requests] Got a request #{metadata.message_id}. Sending a reply..."
    channel.default_exchange.publish(Time.now.to_s,
                                     :routing_key    => metadata.reply_to,
                                     :correlation_id => metadata.message_id,
                                     :mandatory      => true)
    metadata.ack
  end

  Signal.trap("INT") { connection.close { EventMachine.stop } }
end

In the 'client' application, I'd like to render the results of a synchronous call to the 'server' in a view. I realize this is a bit outside the comfort zone of an inherently asynchronous library like the amqp gem, but I'm wondering if there's a way to make it work. Here is my client config/initializers/amqp.rb:

require 'amqp'

EventMachine.next_tick do
  AMQP.connection = AMQP.connect 'amqp://guest:guest@localhost'  
  Signal.trap("INT") { AMQP.connection.close { EventMachine.stop } }
end

Here is the controller:

require "amqp"

class WelcomeController < ApplicationController
  def index    
    puts "[request] Sending a request..."

    WelcomeController.channel.default_exchange.publish("get.time",
      :routing_key => "amqpgem.examples.services.time",
      :message_id  => Kernel.rand(10101010).to_s,
      :reply_to    => WelcomeController.replies_queue.name)

    WelcomeController.replies_queue.subscribe do |metadata, payload|
      puts "[response] Response for #{metadata.correlation_id}: #{payload.inspect}"
      @message = payload.inspect
    end   
  end

  def self.channel
    @channel ||= AMQP::Channel.new(AMQP.connection)
  end

  def self.replies_queue
    @replies_queue ||= channel.queue("reply", :exclusive => true, :auto_delete => true)
  end
end

When I start both applications on different ports and visit the welcome#index view. @message is nil in the view, since the result has not yet returned. The result arrives a few milliseconds after the view is rendered and is displayed on the console:

$ thin start
>> Using rack adapter
>> Thin web server (v1.5.0 codename Knife)
>> Maximum connections set to 1024
>> Listening on 0.0.0.0:3000, CTRL+C to stop
[request] Sending a request...
[response] Response for 3877031: "2012-11-27 22:04:28 -0600"

No surprise here: subscribe is clearly not meant for synchronous calls. What is surprising is that I can't find a synchronous alternative in the AMQP gem source code or in any documentation online. Is there an alternative to subscribe that will give me the RPC behavior I want? Given that there are other parts of the system in which I'd want to use legitimately asynchronous calls, the bunny gem didn't seem like the right tool for the job. Should I give it another look?

edit in response to Sam Stokes

Thanks to Sam for the pointer to throw :async / async.callback. I hadn't seen this technique before and this is exactly the kind of thing I was trying to learn with this experiment in the first place. send_response.finish is gone in Rails 3, but I was able to get his example to work for at least one request with a minor change:

render :text => @message
rendered_response = response.prepare!

Subsequent requests fail with !! Unexpected error while processing request: deadlock; recursive locking. This may have been what Sam was getting at with the comment about getting ActionController to allow concurrent requests, but the cited gist only works for Rails 2. Adding config.allow_concurrency = true in development.rb gets rid of this error in Rails 3, but leads to This queue already has default consumer. from AMQP.

I think this yak is sufficiently shaven. ;-)

While interesting, this is clearly overkill for simple RPC. Something like this Sinatra streaming example seems a more appropriate use case for client interaction with replies. Tenderlove also has a blog post about an upcoming way to stream events in Rails 4 that could work with AMQP.

As Sam points out in his discussion of the HTTP alternative, REST / HTTP makes perfect sense for the RPC portion of my system that involves two Rails apps. There are other parts of the system involving more classic asynchronous event publishing to Clojure apps. For these, the Rails app need only publish events in fire-and-forget fashion, so AMQP will work fine there using my original code without the reply queue.

like image 355
Bobby Norton Avatar asked Nov 28 '12 04:11

Bobby Norton


1 Answers

You can get the behaviour you want - have the client make a simple HTTP request, to which your web app responds asynchronously - but you need more tricks. You need to use Thin's support for asynchronous responses:

require "amqp"

class WelcomeController < ApplicationController
  def index
    puts "[request] Sending a request..."

    WelcomeController.channel.default_exchange.publish("get.time",
      :routing_key => "amqpgem.examples.services.time",
      :message_id  => Kernel.rand(10101010).to_s,
      :reply_to    => WelcomeController.replies_queue.name)

    WelcomeController.replies_queue.subscribe do |metadata, payload|
      puts "[response] Response for #{metadata.correlation_id}: #{payload.inspect}"
      @message = payload.inspect

      # Trigger Rails response rendering now we have the message.
      # Tested in Rails 2.3; may or may not work in Rails 3.x.
      rendered_response = send_response.finish

      # Pass the response to Thin and make it complete the request.
      # env['async.callback'] expects a Rack-style response triple:
      # [status, headers, body]
      request.env['async.callback'].call(rendered_response)
    end

    # This unwinds the call stack, skipping the normal Rails response
    # rendering, all the way back up to Thin, which catches it and
    # interprets as "I'll give you the response later by calling
    # env['async.callback']".
    throw :async
  end

  def self.channel
    @channel ||= AMQP::Channel.new(AMQP.connection)
  end

  def self.replies_queue
    @replies_queue ||= channel.queue("reply", :exclusive => true, :auto_delete => true)
  end
end

As far as the client is concerned, the result is indistinguishable from your web app blocking on a synchronous call before returning the response; but now your web app can process many such requests concurrently.

CAUTION!

Async Rails is an advanced technique; you need to know what you're doing. Some parts of Rails do not take kindly to having their call stack abruptly dismantled. The throw will bypass any Rack middlewares that don't know to catch and rethrow it (here is a rather old partial solution). ActiveSupport's development-mode class reloading will reload your app's classes after the throw, without waiting for the response, which can cause very confusing breakage if your callback refers to a class that has since been reloaded. You'll also need to ask ActionController nicely to allow concurrent requests.

Request/response

You're also going to need to match up requests and responses. As it stands, if Request 1 arrives, and then Request 2 arrives before Request 1 gets a response, then it's undefined which request would receive Response 1 (messages on a queue are distributed round-robin between the consumers subscribed to the queue).

You could do this by inspecting the correlation_id (which you'll have to explicitly set, by the way - RabbitMQ won't do it for you!) and re-enqueuing the message if it's not the response you were waiting for. My approach was to create a persistent Publisher object which would keep track of open requests, listen for all responses, and lookup the appropriate callback to invoke based on the correlation_id.

Alternative: just use HTTP

You're really solving two different (and tricky!) problems here: persuading Rails/thin to process requests asynchronously, and implementing request-response semantics on top of AMQP's publish-subscribe model. Given you said this is for calling between two Rails apps, why not just use HTTP, which already has the request-response semantics you need? That way you only have to solve the first problem. You can still get concurrent request processing if you use a non-blocking HTTP client library, such as em-http-request.

like image 70
Sam Stokes Avatar answered Oct 20 '22 06:10

Sam Stokes