Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Thread safe Enumerator in Ruby

TLDR: Is there a thread-safe version of the Enumerator class in Ruby?


What I'm trying to do:

I have a method in a Ruby On Rails application that I wanted to run concurrently. The method is supposed to create a zip file containing reports from the site, where each file in the zip is a PDF. The conversion from html to PDF is somewhat slow, thus the desire to multi-thread.

How I expected to do it:

I wanted to use 5 threads, so I figured I would have a shared Enumerator between the threads. Each thread would pop a value from the Enumerator, and run do stuff to it. Here's how I was thinking it would work:

t = Zip::OutputStream::write_buffer do |z|
  mutex = Mutex.new
  gen = Enumerator.new{ |g|
    Report.all.includes("employee" => ["boss", "client"], "projects" => {"project_owner" => ["project_team"]}).find_each do |report|
      g.yield report
    end
  }
  5.times.map {
    Thread.new do
      begin
        loop do
          mutex.synchronize  do
            @report = gen.next
          end
          title = @report.title + "_" + @report.id.to_s
          title += ".pdf" unless title.end_with?(".pdf")
          pdf = PDFKit.new(render_to_string(:template => partial_url, locals: {array: [@report]},
                                            :layout => false)).to_pdf
          mutex.synchronize  do
            z.put_next_entry(title)
            z.write(pdf)
          end
        end
      rescue StopIteration
        # do nothing
      end
    end
  }.each {|thread| thread.join }
end

What happened when I tried it:

When I ran the above code, I got the following error:

FiberError at /generate_report
fiber called across threads

After some searching, I came across this post, which recommended that I use a Queue instead of an Enumerator, because Queues are thread safe, while Enumerators are not. While this might be reasonable for non-Rails applications, this is impractical for me.

Why I can't just use a Queue:

The nice thing about Rails 4 ActiveRecord is that it doesn't load queries until they are iterated over. And, if you use a method like find_each to iterate over it, it does it in batches of 1000, so you never have to store an entire table in ram all at once. The results from query I'm using: Report.all.includes("employee" => ["boss", "client"], "projects" => {"project_owner" => ["project_team"]}) is large. Very large. And I need to be able to load it on the fly, rather than doing something like:

gen = Report.all.includes("employee" => ["boss", "client"], "projects" => {"project_owner" => ["project_team"]}).map(&queue.method(:push))

Which will load that entire query into ram.

Finally the question:

Is there a thread-safe way of doing this:

gen = Enumerator.new{ |g|
        Report.all.includes(...).find_each do |report|
          g.yield report
        end
}

So that I can pop data from gen across multiple threads, without having to load my entire Report (and all of the includes) table into ram?

like image 410
Ephraim Avatar asked Sep 11 '15 00:09

Ephraim


People also ask

What is thread-safe in Ruby?

Multithreading is the ability to execute code on multiple concurrent threads. Each thread exists within a process, and each process can have at least one thread.

Are Ruby hashes thread-safe?

Unfortunately, Ruby doesn't ship with any thread-safe Array or Hash implementations. The core Array and Hash classes are not thread-safe by default, nor should they be.

Are instance variables thread-safe in Ruby?

Modules and classes are also objects, and defining a singleton method on a object actually defines it on its singleton class. With that said, and since you have already established instance variable access is thread safe, examples 2 and 3 are thread safe.


1 Answers

If you start the worker threads before filling up the queue, they will start consuming the queue as you fill it up, and because as a rule of thumb - network is slower than CPU, each batch should be (mostly) consumed by the time the next batch arrives:

queue = Queue.new

t1 = Thread.new do
  while !queue.empty?
    p queue.pop(true)
    sleep(0.1)
  end
end
t2 = Thread.new do
  while !queue.empty?
    p queue.pop(true)
    sleep(0.1)
  end
end

(0..1000).map(&queue.method(:push))

t1.join
t2.join

If that proves too slow still, you can opt to use SizedQueue, which will block the push if the queue reaches a big enough size:

queue = SizedQueue.new(100)

t1 = Thread.new do
  while !queue.empty?
    p "#{queue.pop(true)} - #{queue.size}"
    sleep(0.1)
  end
end
t2 = Thread.new do
  while !queue.empty?
    p queue.pop(true)
    sleep(0.1)
  end
end
(0..300).map(&queue.method(:push))
t1.join
t2.join
like image 57
Uri Agassi Avatar answered Oct 06 '22 00:10

Uri Agassi