I have a method in a Ruby On Rails application that I wanted to run concurrently. The method is supposed to create a zip file containing reports from the site, where each file in the zip is a PDF. The conversion from html to PDF is somewhat slow, thus the desire to multi-thread.
I wanted to use 5 threads, so I figured I would have a shared Enumerator between the threads. Each thread would pop a value from the Enumerator, and run do stuff to it. Here's how I was thinking it would work:
t = Zip::OutputStream::write_buffer do |z|
mutex = Mutex.new
gen = Enumerator.new{ |g|
Report.all.includes("employee" => ["boss", "client"], "projects" => {"project_owner" => ["project_team"]}).find_each do |report|
g.yield report
end
}
5.times.map {
Thread.new do
begin
loop do
mutex.synchronize do
@report = gen.next
end
title = @report.title + "_" + @report.id.to_s
title += ".pdf" unless title.end_with?(".pdf")
pdf = PDFKit.new(render_to_string(:template => partial_url, locals: {array: [@report]},
:layout => false)).to_pdf
mutex.synchronize do
z.put_next_entry(title)
z.write(pdf)
end
end
rescue StopIteration
# do nothing
end
end
}.each {|thread| thread.join }
end
When I ran the above code, I got the following error:
FiberError at /generate_report
fiber called across threads
After some searching, I came across this post, which recommended that I use a Queue instead of an Enumerator, because Queues are thread safe, while Enumerators are not. While this might be reasonable for non-Rails applications, this is impractical for me.
The nice thing about Rails 4 ActiveRecord is that it doesn't load queries until they are iterated over. And, if you use a method like find_each
to iterate over it, it does it in batches of 1000, so you never have to store an entire table in ram all at once. The results from query I'm using: Report.all.includes("employee" => ["boss", "client"], "projects" => {"project_owner" => ["project_team"]})
is large. Very large. And I need to be able to load it on the fly, rather than doing something like:
gen = Report.all.includes("employee" => ["boss", "client"], "projects" => {"project_owner" => ["project_team"]}).map(&queue.method(:push))
Which will load that entire query into ram.
Is there a thread-safe way of doing this:
gen = Enumerator.new{ |g|
Report.all.includes(...).find_each do |report|
g.yield report
end
}
So that I can pop data from gen
across multiple threads, without having to load my entire Report
(and all of the includes) table into ram?
Multithreading is the ability to execute code on multiple concurrent threads. Each thread exists within a process, and each process can have at least one thread.
Unfortunately, Ruby doesn't ship with any thread-safe Array or Hash implementations. The core Array and Hash classes are not thread-safe by default, nor should they be.
Modules and classes are also objects, and defining a singleton method on a object actually defines it on its singleton class. With that said, and since you have already established instance variable access is thread safe, examples 2 and 3 are thread safe.
If you start the worker threads before filling up the queue, they will start consuming the queue as you fill it up, and because as a rule of thumb - network is slower than CPU, each batch should be (mostly) consumed by the time the next batch arrives:
queue = Queue.new
t1 = Thread.new do
while !queue.empty?
p queue.pop(true)
sleep(0.1)
end
end
t2 = Thread.new do
while !queue.empty?
p queue.pop(true)
sleep(0.1)
end
end
(0..1000).map(&queue.method(:push))
t1.join
t2.join
If that proves too slow still, you can opt to use SizedQueue
, which will block the push
if the queue reaches a big enough size:
queue = SizedQueue.new(100)
t1 = Thread.new do
while !queue.empty?
p "#{queue.pop(true)} - #{queue.size}"
sleep(0.1)
end
end
t2 = Thread.new do
while !queue.empty?
p queue.pop(true)
sleep(0.1)
end
end
(0..300).map(&queue.method(:push))
t1.join
t2.join
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With