Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Subscribe to a queue, receive 1 message, and then unsubscribe

I have a scenario where I need to distribute and process jobs extremely quickly. I'll have about 45 jobs populated in the queue quickly and I can process about 20 simultaneously (5 machines, 4 cores each). Each job takes a variable amount of time, and to complicate matters garbage collection is an issue so I need to be able to take a consumer offline for garbage collection.

Currently, I have everything working with pop (every consumer pops every 5ms). This seems undesirable because it translates to 600 pop requests per second to rabbitmq.

I would love if it there were a pop command that would act like subscribe, but for only one message. (the process would block, waiting for input from the rabbitMQ connection, via something akin to Kernel.select)

I have tried to trick the AMQP gem to doing something like this, but it's not working: I can't seem to unsubscribe until the queue is empty and no more messages are being sent to the consumer. Other methods of unsubscribing I fear will lose messages.

consume_1.rb :

require "amqp"

EventMachine.run do
  puts "connecting..."
  connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
  puts "Connected to AMQP broker"

  channel = AMQP::Channel.new(connection)
  queue = channel.queue("tasks", :auto_delete => true)
  exchange = AMQP::Exchange.default(channel)

  queue.subscribe do |payload|
    puts "Received a message: #{payload}."
    queue.unsubscribe { puts "unbound" }
    sleep 3
  end
end

consumer_many.rb:

require "amqp"

# Imagine the command is something CPU - intensive like image processing.
command = "sleep 0.1"

EventMachine.run do
  puts "connecting..."
  connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
  puts "Connected to AMQP broker"

  channel = AMQP::Channel.new(connection)
  queue = channel.queue("tasks", :auto_delete => true)
  exchange = AMQP::Exchange.default(channel)

  queue.subscribe do |payload|
    puts "Received a message: #{payload}."
  end
end

producer.rb:

require "amqp"

i = 0
EventMachine.run do
  connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
  puts "Connected to AMQP broker"

  channel = AMQP::Channel.new(connection)
  queue = channel.queue("tasks", :auto_delete => true)
  exchange = AMQP::Exchange.default(channel)

  EM.add_periodic_timer(1) do
    msg = "Message #{i}"
    i+=1
    puts "~ publishing #{msg}"
  end
end

I'll launch consume_many.rb and producer.rb. Messages will flow as expected.

When I launch consume_1.rb, it gets every other message (as expected). But it NEVER unsubscribes because it never finishes processing all of its messages... so on it goes.

How do I get consume_1.rb to subscribe to the queue, get a single message, and then take itself out of the load-balancer ring so it can do it's work, without losing any additional pending jobs that might be in the queue and would otherwise be scheduled to be sent to the process?

Tim

like image 735
Tim Harper Avatar asked Apr 15 '11 23:04

Tim Harper


2 Answers

This is a simple, yet very poorly documented feature of the AMQP gem, what you need is this:

In your consumer:

channel = AMQP::Channel.new(connection, :prefetch => 1)

And then with your subscribe block, do:

queue.subscribe(:ack => true) do |queue_header, payload|
  puts "Received a message: #{payload}."
  # Do long running work here

  # Acknowledge message
  queue_header.ack
end

What this does it tells RabbitMQ to only send your consumer 1 message at a time, and not send another message until you call ack on the queue header after working on a long running task for a while.

I might need to be corrected on this, but I believe a direct exchange would be better suited to this task.

like image 67
Ivan Avatar answered Nov 11 '22 01:11

Ivan


With the environment that I have,

RabbitMQ version: 3.3.3

amqp gem version: 1.5.0

The solution from Ivan still resulted in all the messages being fetched from the queue.

Instead, the number of unacknowledged messages by subscribing to a queue can be limited by setting the QoS of a channel.

As per the API document of AMQP::Channel,

#qos(prefetch_size = 0, prefetch_count = 32, global = false, &block) ⇒ Object

One note for the method in that, if you are running RabbitMQ servers after version 2.3.6, prefetch_size is deprecated.

channel = AMQP::Channel.new(connection, :prefetch => 1)
channel.qos(0, 1)

queue = channel.queue(queue_name, :auto_delete => false)
queue.subscribe(:ack => true) do |metadata, payload|
  puts "Received a message: #{payload}."

  # Do long running work here

  # Acknowledge message
  metadata.ack
end

Hope the solution helps someone out.

Cheers.

like image 43
Jack Wu Avatar answered Nov 11 '22 01:11

Jack Wu