Let's say that I have 200 expensive method calls to make (each with different arguments). For some reason, I can execute 5 of those calls in parallel, but no more. I could execute them one at a time, but doing 5 at a time is 5 times faster.
I want to always be executing five things. I don't want to queue five, wait until all five are done, and then queue five more. If I queue A,B,C,D,E and C finishes first, I want to immediately replace it with F, even if A and B aren't done yet.
I've been reading into this problem, since it's something I can imagine happening on a regular basis. The solution seems to be the producer-consumer pattern, and Ruby has some structures built into its standard library for use with that pattern (Queue
and SizedQueue
). I've played around with code samples, read some of the documentation, and I think I have a rough understanding of it. But I have some questions I'm not confident in my solution, and the whole area of multithreading is new ground for me, so I thought I'd ask here to make sure I'm not, you know, totally wrong and just lucky.
So here's a test program I wrote:
q = Queue.new
q << 'balloon'
q << 'sandwich'
q << 'clown'
q << 'fairy floss'
q << 'ferris wheel'
q << 'magician'
q << 'cake'
q << 'present'
q << 'chip'
q << 'game'
q << 'animal'
consumer_1 = Thread.new do
until q.empty?
sleep rand(0..10)
print "#{q.pop}\n"
end
end
# consumer 2 and 3 are identical to consumer 1
[consumer_1, consumer_2, consumer_3].map(&:join)
The queue holds a list of things we need for a birthday party. 3 consumer threads work through the list.
This works, my questions are:
If it's the number of consumers that determines how many items are being worked in parallel, what's the point of having a sized queue?
Would a sized queue only be useful in situations where the tasks are infinite, unknown, or vast in number and you want to pause before filling the queue up?
Have I failed to implement the problem properly? Creating multiple threads by hand and then calling join
on them seems slightly messy. Is there a better solution?
A SizedQueue prevents a producer from adding items faster then the consumers can consume.
SizedQueue#push
If there is no space left in the queue, waits until space becomes available
Queue#pop / SizedQueue#pop
If the queue is empty, the calling thread is suspended until data is pushed onto the queue.
SizedQueue vs Queue
require 'thread'
require 'logger'
queue = SizedQueue.new(3)
#queue = Queue.new # goes berzerk
logger = Logger.new(STDOUT)
Thread.new do
item = 0
loop do
item += 1
queue << item
logger.info "#{item} produced"
end
end
consumers = 2.times.map do |i|
Thread.new do
loop do
item = queue.pop
logger.info "consumed #{item}"
sleep item
end
end
end
consumers.each(&:join)
How to stop the worker threads
require 'thread'
require 'logger'
queue = Queue.new
logger = Logger.new(STDOUT)
consumers_count = 5
end_object = BasicObject.new
consumers = consumers_count.times.map do
Thread.new do
until (item = queue.pop) == end_object
logger.info "consumed #{item}"
end
end
end
1000.times.each { |item| queue << item }
consumers_count.times { queue << end_object }
consumers.each(&:join)
Further reading:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With