Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Implementing the producer-consumer pattern in Ruby

Let's say that I have 200 expensive method calls to make (each with different arguments). For some reason, I can execute 5 of those calls in parallel, but no more. I could execute them one at a time, but doing 5 at a time is 5 times faster.

I want to always be executing five things. I don't want to queue five, wait until all five are done, and then queue five more. If I queue A,B,C,D,E and C finishes first, I want to immediately replace it with F, even if A and B aren't done yet.

I've been reading into this problem, since it's something I can imagine happening on a regular basis. The solution seems to be the producer-consumer pattern, and Ruby has some structures built into its standard library for use with that pattern (Queue and SizedQueue). I've played around with code samples, read some of the documentation, and I think I have a rough understanding of it. But I have some questions I'm not confident in my solution, and the whole area of multithreading is new ground for me, so I thought I'd ask here to make sure I'm not, you know, totally wrong and just lucky.

So here's a test program I wrote:

q = Queue.new
q << 'balloon'
q << 'sandwich'
q << 'clown'
q << 'fairy floss'
q << 'ferris wheel'
q << 'magician'
q << 'cake'
q << 'present'
q << 'chip'
q << 'game'
q << 'animal'

consumer_1 = Thread.new do
  until q.empty?
    sleep rand(0..10)
    print "#{q.pop}\n"
  end
end

# consumer 2 and 3 are identical to consumer 1

[consumer_1, consumer_2, consumer_3].map(&:join)

The queue holds a list of things we need for a birthday party. 3 consumer threads work through the list.

This works, my questions are:

If it's the number of consumers that determines how many items are being worked in parallel, what's the point of having a sized queue?

Would a sized queue only be useful in situations where the tasks are infinite, unknown, or vast in number and you want to pause before filling the queue up?

Have I failed to implement the problem properly? Creating multiple threads by hand and then calling join on them seems slightly messy. Is there a better solution?

like image 402
GreenTriangle Avatar asked Apr 21 '15 03:04

GreenTriangle


1 Answers

A SizedQueue prevents a producer from adding items faster then the consumers can consume.

SizedQueue#push

If there is no space left in the queue, waits until space becomes available

Queue#pop / SizedQueue#pop

If the queue is empty, the calling thread is suspended until data is pushed onto the queue.

SizedQueue vs Queue

require 'thread'
require 'logger'

queue = SizedQueue.new(3)
#queue = Queue.new # goes berzerk
logger = Logger.new(STDOUT)

Thread.new do
  item = 0
  loop do
    item += 1
    queue << item
    logger.info "#{item} produced"
  end
end

consumers = 2.times.map do |i|
  Thread.new do
    loop do
      item = queue.pop
      logger.info "consumed #{item}"
      sleep item
    end
  end
end

consumers.each(&:join)

How to stop the worker threads

require 'thread'
require 'logger'

queue = Queue.new
logger = Logger.new(STDOUT)
consumers_count = 5

end_object = BasicObject.new

consumers = consumers_count.times.map do
  Thread.new do
    until (item = queue.pop) == end_object
      logger.info "consumed #{item}"
    end
  end
end

1000.times.each { |item| queue << item }
consumers_count.times { queue << end_object }

consumers.each(&:join)

Further reading:

  • a writeup about Queue and SizedQueue
like image 167
bliof Avatar answered Sep 18 '22 11:09

bliof