Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Crystal convert the idea behind Thread pool to Fibers/spawn

I'm having some hard time learning the idea behind Fibers\coroutines and the implementation in Crystal.

I hope this is the right place to ask this, I'll totally accept a "not here" answer :)

This is my usual way of handling multi-threading in Ruby:

threads = []
max_threads = 10

loop do
  begin
    threads << Thread.new do
      helper_method(1,2,3,4)
    end
  rescue Exception => e
    puts "Error Starting thread"
  end

  begin
    threads = threads.select { |t| t.alive? ? true : (t.join; false) }
    while threads.size >= max_threads
      puts 'Got Maximum threads'
      sleep 1
      threads = threads.select { |t| t.alive? ? true : (t.join; false) }
    end
  rescue Exception => e
    puts e
  end
end

This way I open a new Thread, usually of a incoming connection or some other thing, add the Thread to a threads array, and then check that I don't have more threads then what I wanted.

What would be a good way to implement something similar in Crystal using spawn\channels\fibers etc.. ?

like image 235
Ba7a7chy Avatar asked Jun 15 '15 19:06

Ba7a7chy


2 Answers

Something like this:

require "socket"

ch = Channel(TCPSocket).new

10.times do
  spawn do
    loop do
      socket = ch.receive
      socket.puts "Hi!"
      socket.close
    end
  end
end

server = TCPServer.new(1234)
loop do
  socket = server.accept
  ch.send socket
end

This code will pre-spawn 10 fibers to attend the requests. The channel is unbuffered so the connections wont be queuing if they cannot be attended by any fiber.

like image 181
waj Avatar answered Nov 05 '22 05:11

waj


You can't replicate the way it works for threads. spawn doesn't return a coroutine object, and there ain't no way to join coroutines.

Yet we can open a channel to communicate between the coroutines and the pool manager. This manager may run within it's own coroutine or be the main coroutine —that will prevent the process from exiting.

Here is a working example, with a worker(&block) method that will spawn a coroutine, and open a channel to return its status (it failed or it terminated), and a pool(&block) method that will keep a pool of such workers and read from the result channels to know the state of the coroutines, and keep spawning new ones.

def worker(&block)
  result = UnbufferedChannel(Exception?).new

  ::spawn do
    begin
      block.call
    rescue ex
      result.send(ex)
    else
      result.send(nil)
    end
  end

  result
end

def pool(size, &block)
  counter = 0
  results = [] of UnbufferedChannel(Exception?)

  loop do
    while counter < size
      counter += 1
      puts "spawning worker"
      results << worker(&block)
    end

    result = Channel.select(results)
    counter -= 1
    results.delete(result)

    if ex = result.receive
      puts "ERROR: #{ex.message}"
    else
      puts "worker terminated"
    end
  end
end

pool(5) do
  loop { helper_method(1, 2, 3, 4) }
end
like image 6
Julien Portalier Avatar answered Nov 05 '22 05:11

Julien Portalier