I couldn't find a decent ThreadPool implementation for Ruby, so I wrote mine (based partly on code from here: http://web.archive.org/web/20081204101031/http://snippets.dzone.com:80/posts/show/3276 , but changed to wait/signal and other implementation for ThreadPool shutdown. However after some time of running (having 100 threads and handling about 1300 tasks), it dies with deadlock on line 25 - it waits for a new job there. Any ideas, why it might happen?
require 'thread'
begin
require 'fastthread'
rescue LoadError
$stderr.puts "Using the ruby-core thread implementation"
end
class ThreadPool
class Worker
def initialize(callback)
@mutex = Mutex.new
@cv = ConditionVariable.new
@callback = callback
@mutex.synchronize {@running = true}
@thread = Thread.new do
while @mutex.synchronize {@running}
block = get_block
if block
block.call
reset_block
# Signal the ThreadPool that this worker is ready for another job
@callback.signal
else
# Wait for a new job
@mutex.synchronize {@cv.wait(@mutex)} # <=== Is this line 25?
end
end
end
end
def name
@thread.inspect
end
def get_block
@mutex.synchronize {@block}
end
def set_block(block)
@mutex.synchronize do
raise RuntimeError, "Thread already busy." if @block
@block = block
# Signal the thread in this class, that there's a job to be done
@cv.signal
end
end
def reset_block
@mutex.synchronize {@block = nil}
end
def busy?
@mutex.synchronize {[email protected]?}
end
def stop
@mutex.synchronize {@running = false}
# Signal the thread not to wait for a new job
@cv.signal
@thread.join
end
end
attr_accessor :max_size
def initialize(max_size = 10)
@max_size = max_size
@workers = []
@mutex = Mutex.new
@cv = ConditionVariable.new
end
def size
@mutex.synchronize {@workers.size}
end
def busy?
@mutex.synchronize {@workers.any? {|w| w.busy?}}
end
def shutdown
@mutex.synchronize {@workers.each {|w| w.stop}}
end
alias :join :shutdown
def process(block=nil,&blk)
block = blk if block_given?
while true
@mutex.synchronize do
worker = get_worker
if worker
return worker.set_block(block)
else
# Wait for a free worker
@cv.wait(@mutex)
end
end
end
end
# Used by workers to report ready status
def signal
@cv.signal
end
private
def get_worker
free_worker || create_worker
end
def free_worker
@workers.each {|w| return w unless w.busy?}; nil
end
def create_worker
return nil if @workers.size >= @max_size
worker = Worker.new(self)
@workers << worker
worker
end
end
Ok, so the main problem with the implementation is: how to make sure no signal is lost and avoid dead locks ?
In my experience, this is REALLY hard to achieve with condition variables and mutex, but easy with semaphores. It so happens that ruby implement an object called Queue (or SizedQueue) that should solve the problem. Here is my suggested implementation:
require 'thread'
begin
require 'fasttread'
rescue LoadError
$stderr.puts "Using the ruby-core thread implementation"
end
class ThreadPool
class Worker
def initialize(thread_queue)
@mutex = Mutex.new
@cv = ConditionVariable.new
@queue = thread_queue
@running = true
@thread = Thread.new do
@mutex.synchronize do
while @running
@cv.wait(@mutex)
block = get_block
if block
@mutex.unlock
block.call
@mutex.lock
reset_block
end
@queue << self
end
end
end
end
def name
@thread.inspect
end
def get_block
@block
end
def set_block(block)
@mutex.synchronize do
raise RuntimeError, "Thread already busy." if @block
@block = block
# Signal the thread in this class, that there's a job to be done
@cv.signal
end
end
def reset_block
@block = nil
end
def busy?
@mutex.synchronize { [email protected]? }
end
def stop
@mutex.synchronize do
@running = false
@cv.signal
end
@thread.join
end
end
attr_accessor :max_size
def initialize(max_size = 10)
@max_size = max_size
@queue = Queue.new
@workers = []
end
def size
@workers.size
end
def busy?
@queue.size < @workers.size
end
def shutdown
@workers.each { |w| w.stop }
@workers = []
end
alias :join :shutdown
def process(block=nil,&blk)
block = blk if block_given?
worker = get_worker
worker.set_block(block)
end
private
def get_worker
if [email protected]? or @workers.size == @max_size
return @queue.pop
else
worker = Worker.new(@queue)
@workers << worker
worker
end
end
end
And here is a simple test code:
tp = ThreadPool.new 500
(1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } }
tp.shutdown
You can try the work_queue gem, designed to coordinate work between a producer and a pool of worker threads.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With