I have a scenario where I need to distribute and process jobs extremely quickly. I'll have about 45 jobs populated in the queue quickly and I can process about 20 simultaneously (5 machines, 4 cores each). Each job takes a variable amount of time, and to complicate matters garbage collection is an issue so I need to be able to take a consumer offline for garbage collection.
Currently, I have everything working with pop (every consumer pops every 5ms). This seems undesirable because it translates to 600 pop requests per second to rabbitmq.
I would love if it there were a pop command that would act like subscribe, but for only one message. (the process would block, waiting for input from the rabbitMQ connection, via something akin to Kernel.select)
I have tried to trick the AMQP gem to doing something like this, but it's not working: I can't seem to unsubscribe until the queue is empty and no more messages are being sent to the consumer. Other methods of unsubscribing I fear will lose messages.
consume_1.rb :
require "amqp"
EventMachine.run do
puts "connecting..."
connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
puts "Connected to AMQP broker"
channel = AMQP::Channel.new(connection)
queue = channel.queue("tasks", :auto_delete => true)
exchange = AMQP::Exchange.default(channel)
queue.subscribe do |payload|
puts "Received a message: #{payload}."
queue.unsubscribe { puts "unbound" }
sleep 3
end
end
consumer_many.rb:
require "amqp"
# Imagine the command is something CPU - intensive like image processing.
command = "sleep 0.1"
EventMachine.run do
puts "connecting..."
connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
puts "Connected to AMQP broker"
channel = AMQP::Channel.new(connection)
queue = channel.queue("tasks", :auto_delete => true)
exchange = AMQP::Exchange.default(channel)
queue.subscribe do |payload|
puts "Received a message: #{payload}."
end
end
producer.rb:
require "amqp"
i = 0
EventMachine.run do
connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
puts "Connected to AMQP broker"
channel = AMQP::Channel.new(connection)
queue = channel.queue("tasks", :auto_delete => true)
exchange = AMQP::Exchange.default(channel)
EM.add_periodic_timer(1) do
msg = "Message #{i}"
i+=1
puts "~ publishing #{msg}"
end
end
I'll launch consume_many.rb and producer.rb. Messages will flow as expected.
When I launch consume_1.rb, it gets every other message (as expected). But it NEVER unsubscribes because it never finishes processing all of its messages... so on it goes.
How do I get consume_1.rb to subscribe to the queue, get a single message, and then take itself out of the load-balancer ring so it can do it's work, without losing any additional pending jobs that might be in the queue and would otherwise be scheduled to be sent to the process?
Tim
This is a simple, yet very poorly documented feature of the AMQP gem, what you need is this:
In your consumer:
channel = AMQP::Channel.new(connection, :prefetch => 1)
And then with your subscribe block, do:
queue.subscribe(:ack => true) do |queue_header, payload|
puts "Received a message: #{payload}."
# Do long running work here
# Acknowledge message
queue_header.ack
end
What this does it tells RabbitMQ to only send your consumer 1 message at a time, and not send another message until you call ack
on the queue header after working on a long running task for a while.
I might need to be corrected on this, but I believe a direct
exchange would be better suited to this task.
With the environment that I have,
RabbitMQ version: 3.3.3
amqp gem version: 1.5.0
The solution from Ivan still resulted in all the messages being fetched from the queue.
Instead, the number of unacknowledged messages by subscribing to a queue can be limited by setting the QoS of a channel.
As per the API document of AMQP::Channel,
#qos(prefetch_size = 0, prefetch_count = 32, global = false, &block) ⇒ Object
One note for the method in that, if you are running RabbitMQ servers after version 2.3.6, prefetch_size is deprecated.
channel = AMQP::Channel.new(connection, :prefetch => 1)
channel.qos(0, 1)
queue = channel.queue(queue_name, :auto_delete => false)
queue.subscribe(:ack => true) do |metadata, payload|
puts "Received a message: #{payload}."
# Do long running work here
# Acknowledge message
metadata.ack
end
Hope the solution helps someone out.
Cheers.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With