Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

JS-style async/non-blocking callback execution with Ruby, without heavy machinery like threads?

Tags:

ruby

I'm a frontend developer, somewhat familiar with Ruby. I only know how to do Ruby in a synchronous/sequential manner, while in JS i'm used to async/non-blocking callbacks.

Here's sample Ruby code:

results = []
rounds = 5

callback = ->(item) {
  # This imitates that the callback may take time to complete
  sleep rand(1..5)

  results.push item

  if results.size == rounds
    puts "All #{rounds} requests have completed! Here they are:", *results
  end
}

1.upto(rounds) { |item| callback.call(item) }

puts "Hello"

The goal is to have the callbacks run without blocking main script execution. In other words, i want "Hello" line to appear in output above the "All 5 requests..." line. Also, the callbacks should run concurrently, so that the callback fastest to finish makes it into the resulting array first.

With JavaScript, i would simply wrap the callback call into a setTimeout with zero delay:

setTimeout( function() { callback(item); }, 0);

This JS approach does not implement true multithreading/concurrency/parallel execution. Under the hood, the callbacks would run all in one thread sequentially, or rather interlaced on the low level.

But on practical level it would appear as concurrent execution: the resulting array would be populated in an order corresponding to the amount of time spent by each callback, i. e. the resulting array would appear sorted by the time it took each callback to finish.

Note that i only want the asynchronous feature of setTimeout(). I don't need the sleep feature built into setTimeout() (not to be confused with a sleep used in the callback example to imitate a time-consuming operation).

I tried to inquire into how to do that JS-style async approach with Ruby and was given suggestions to use:

  1. Multithreading. This is probably THE approach for Ruby, but it requires a substantial amount of scaffolding:

    1. Manually define an array for threads.
    2. Manually define a mutex.
    3. Start a new thread for each callback, add it to the array.
    4. Pass the mutex into each callback.
    5. Use mutex in the callback for thread synchronization.
    6. Ensure all threads are completed before program completion.

    Compared to JavaScript's setTimeout(), this is just too much. As i don't need true parallel execution, i don't want to build that much scaffolding every time i want to execute a proc asynchronously.

  2. A sophisticated Ruby library like Celluloid and Event Machine. They look like it will take weeks to learn them.

  3. A custom solution like this one (the author, apeiros@freenode, claims it to be very close to what setTimeout does under the hood). It requires almost no scaffolding to build and it does not involve threads. But it seems to run callbacks synchronously, in the order they've been executed.

I have always considered Ruby to be a programming language most close to my ideal, and JS to be a poor man's programming language. And it kinda discourages me that Ruby is not able to do a thing which is trivial with JS, without involving heavy machinery.

So the question is: what is the simplest, most intuitive way to do do async/non-blocking callback with Ruby, without involving complicated machinery like threads or complex libraries?

PS If there will be no satisfying answer during the bounty period, i will dig into #3 by apeiros and probably make it the accepted answer.

like image 656
Andrey Mikhaylov - lolmaus Avatar asked Sep 30 '22 05:09

Andrey Mikhaylov - lolmaus


2 Answers

Like people said, it's not possible to achieve what you want without using Threads or a library that abstracts their functionality. But, if it's just the setTimeout functionality you want, then the implementation is actually very small.

Here's my attempt at emulating Javascript's setTimeout in ruby:

require 'thread'
require 'set'

module Timeout
  @timeouts = Set[]
  @exiting = false

  @exitm = Mutex.new
  @mutex = Mutex.new

  at_exit { wait_for_timeouts }

  def self.set(delay, &blk)
    thrd = Thread.start do
      sleep delay
      blk.call
      @exitm.synchronize do
        unless @exiting
          @mutex.synchronize { @timeouts.delete thrd }
        end
      end
    end

    @mutex.synchronize { @timeouts << thrd }
  end

  def self.wait_for_timeouts
    @exitm.synchronize { @exiting = true }
    @timeouts.each(&:join)
    @exitm.synchronize { @exiting = false }
  end
end

Here's how to use it:

$results = []
$rounds = 5

mutex = Mutex.new
def callback(n, mutex)
  -> {
    sleep rand(1..5)

    mutex.synchronize {
      $results << n
      puts "Fin: #{$results}" if $results.size == $rounds
    }
  }
end

1.upto($rounds) { |i| Timeout.set(0, &callback(i, mutex)) }

puts "Hello"

This outputs:

Hello
Fin: [1, 2, 3, 5, 4]

As you can see, the way you use it is essentially the same, the only thing I've changed is I've added a mutex to prevent race conditions on the results array.

Aside: Why we need the mutex in the usage example

Even if javascript is only running on a single core, that does not prevent race conditions due to atomicity of operations. Pushing to an array is not an atomic operation, so more than one instruction is executed.

  • Suppose it is two instructions, putting the element at the end, and incrementing the size. (SET, INC).
  • Consider all the ways two pushes can be interleaved (taking symmetry into account):
    • SET1 INC1 SET2 INC2
    • SET1 SET2 INC1 INC2
  • The first one is what we want, but the second one results in the second append overwriting the first.
like image 87
amnn Avatar answered Oct 10 '22 21:10

amnn


Okay, after some fiddling with threads and studying contributions by apeiros and asQuirreL, i came up with a solution that suits me.

I'll show sample usage first, source code in the end.

Example 1: simple non-blocking execution

First, a JS example that i'm trying to mimic:

setTimeout( function() {
  console.log("world");
}, 0);

console.log("hello");

// 'Will print "hello" first, then "world"'.

Here's how i can do it with my tiny Ruby library:

# You wrap all your code into this...
Branch.new do

  # ...and you gain access to the `branch` method that accepts a block.
  # This block runs non-blockingly, just like in JS `setTimeout(callback, 0)`.
  branch { puts "world!" }

  print "Hello, "

end

# Will print "Hello, world!"

Note how you don't have to take care of creating threads, waiting for them to finish. The only scaffolding required is the Branch.new { ... } wrapper.

Example 2: synchronizing threads with a mutex

Now we'll assume that we're working with some input and output shared among threads.

JS code i'm trying to reproduce with Ruby:

var
  results = [],
  rounds = 5;

for (var i = 1; i <= rounds; i++) {

  console.log("Starting thread #" + i + ".");

  // "Creating local scope"
  (function(local_i) {
    setTimeout( function() {

      // "Assuming there's a time-consuming operation here."

      results.push(local_i);
      console.log("Thread #" + local_i + " has finished.");

      if (results.length === rounds)
        console.log("All " + rounds + " threads have completed! Bye!");

    }, 0);
  })(i);
}

console.log("All threads started!");

This code produces the following output:

Starting thread #1.
Starting thread #2.
Starting thread #3.
Starting thread #4.
Starting thread #5.
All threads started!
Thread #5 has finished.
Thread #4 has finished.
Thread #3 has finished.
Thread #2 has finished.
Thread #1 has finished.
All 5 threads have completed! Bye!

Notice that the callbacks finish in reverse order.

We're also gonna assume that working the results array may produce a race condition. In JS this is never an issue, but in multithreaded Ruby this has to be addressed with a mutex.

Ruby equivalent of the above:

Branch.new 1 do

  # Setting up an array to be filled with that many values.
  results = []
  rounds = 5

  # Running `branch` N times:
  1.upto(rounds) do |item|

    puts "Starting thread ##{item}."

    # The block passed to `branch` accepts a hash with mutexes 
    # that you can use to synchronize threads.
    branch do |mutexes|

      # This imitates that the callback may take time to complete.
      # Threads will finish in reverse order.
      sleep (6.0 - item) / 10

      # When you need a mutex, you simply request one from the hash.
      # For each unique key, a new mutex will be created lazily.
      mutexes[:array_and_output].synchronize do
        puts "Thread ##{item} has finished!"
        results.push item

        if results.size == rounds
          puts "All #{rounds} threads have completed! Bye!"
        end
      end
    end
  end

  puts "All threads started."
end

puts "All threads finished!"

Note how you don't have to take care of creating threads, waiting for them to finish, creating mutexes and passing them into the block.

Example 3: delaying execution of the block

If you need the delay feature of setTimeout, you can do it like this.

JS:

setTimeout(function(){ console.log('Foo'); }, 2000);

Ruby:

branch(2) { puts 'Foo' }

Example 4: waiting for all threads to finish

With JS, there's no simple way to have the script wait for all threads to finish. You'll need an await/defer library for that.

But in Ruby it's possible, and Branch makes it even simpler. If you write code after the Branch.new{} wrapper, it will be executed after all branches within the wrapper have been completed. You don't need to manually ensure that all threads have finished, Branch does that for you.

Branch.new do
  branch { sleep 10 }
  branch { sleep 5 }

  # This will be printed immediately
  puts "All threads started!"
end

# This will be printed after 10 seconds (the duration of the slowest branch).
puts "All threads finished!"

Sequential Branch.new{} wrappers will be executed sequentially.

Source

# (c) lolmaus (Andrey Mikhaylov), 2014
# MIT license http://choosealicense.com/licenses/mit/

class Branch
  def initialize(mutexes = 0, &block)
    @threads = []
    @mutexes = Hash.new { |hash, key| hash[key] = Mutex.new }

    # Executing the passed block within the context
    # of this class' instance.
    instance_eval &block

    # Waiting for all threads to finish
    @threads.each { |thr| thr.join }
  end

  # This method will be available within a block
  # passed to `Branch.new`.
  def branch(delay = false, &block)

    # Starting a new thread 
    @threads << Thread.new do

      # Implementing the timeout functionality
      sleep delay if delay.is_a? Numeric

      # Executing the block passed to `branch`,
      # providing mutexes into the block.
      block.call @mutexes
    end
  end
end
like image 21
Andrey Mikhaylov - lolmaus Avatar answered Oct 10 '22 19:10

Andrey Mikhaylov - lolmaus