Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to start bunny thread in Rails and Thin

I'm integrating Bunny gem for RabbitMQ with Rails, should I start Bunny thread in an initializer that Rails starts with application start or do it in a separate rake task so I can start it in a separate process ?

I think if I'm producing messages only then I need to do it in Rails initializer so it can be used allover the app, but if I'm consuming I should do it in a separate rake task, is this correct ?

like image 895
Moustafa Samir Avatar asked May 03 '14 21:05

Moustafa Samir


1 Answers

You are correct: you should not be consuming from the Rails application itself. The Rails application should be a producer, in which case, an initializer is the correct place to start the Bunny instance.

I essentially have this code in my Rails applications which publish messages to RabbitMQ:

# config/initializers/bunny.rb
MESSAGING_SERVICE = MessagingService.new(ENV.fetch("AMQP_URL"))
MESSAGING_SERVICE.start

# app/controllers/application_controller.rb
class ApplicationController
  def messaging_service
    MESSAGING_SERVICE
  end
end

# app/controllers/uploads_controller.rb
class UploadsController < ApplicationController
  def create
    # save the model
    messaging_service.publish_resize_image_request(model.id)
    redirect_to uploads_path
  end
end

# lib/messaging_service.rb
class MessagingService
  def initialize(amqp_url)
    @bunny = Bunny.new(amqp_url)
    @bunny.start
    at_exit { @bunny.stop }
  end

  attr_reader :bunny

  def publish_resize_image_request(image_id)
    resize_image_exchange.publish(image_id.to_s)
  end

  def resize_image_exchange
    @resize_image_exchange ||=
      channel.exchange("resize-image", passive: true)
  end

  def channel
    @channel ||= bunny.channel
  end
end

For consuming messages, I prefer to start executables without Rake involved. Rake will fork a new process, which will use more memory.

# bin/image-resizer-worker
require "bunny"
bunny = Bunny.new(ENV.fetch("AMQP_URL"))
bunny.start
at_exit { bunny.stop }

channel = bunny.channel

# Tell RabbitMQ to send this worker at most 2 messages at a time
# Else, RabbitMQ will send us as many messages as we can absorb,
# which would be 100% of the queue. If we have multiple worker
# instances, we want to load-balance between each of them.
channel.prefetch(2)

exchange = channel.exchange("resize-image", type: :direct, durable: true)
queue = channel.queue("resize-image", durable: true)
queue.bind(exchange)
queue.subscribe(manual_ack: true, block: true) do |delivery_info, properties, payload|
  begin
    upload = Upload.find(Integer(payload))
    # somehow, resize the image and/or post-process the image

    # Tell RabbitMQ we processed the message, in order to not see it again
    channel.acknowledge(delivery_info.delivery_tag, false)

  rescue ActiveRecord::RecordNotFound => _
    STDERR.puts "Model does not exist: #{payload.inspect}"
    # If the model is not in the database, we don't want to see this message again
    channel.acknowledge(delivery_info.delivery_tag, false)

  rescue Errno:ENOSPC => e
    STDERR.puts "Ran out of disk space resizing #{payload.inspect}"
    # Do NOT ack the message, in order to see it again at a later date
    # This worker, or another one on another host, may have free space to
    # process the image.

  rescue RuntimeError => e
    STDERR.puts "Failed to resize #{payload}: #{e.class} - #{e.message}"
    # The fallback should probably be to ack the message.
    channel.acknowledge(delivery_info.delivery_tag, false)
  end
end

Given all that though, you may be better off with pre-built gems and using Rails' abstraction, ActiveJob.

like image 67
François Beausoleil Avatar answered Oct 10 '22 02:10

François Beausoleil