I'm having some hard time learning the idea behind Fibers\coroutines and the implementation in Crystal.
I hope this is the right place to ask this, I'll totally accept a "not here" answer :)
This is my usual way of handling multi-threading in Ruby:
threads = []
max_threads = 10
loop do
begin
threads << Thread.new do
helper_method(1,2,3,4)
end
rescue Exception => e
puts "Error Starting thread"
end
begin
threads = threads.select { |t| t.alive? ? true : (t.join; false) }
while threads.size >= max_threads
puts 'Got Maximum threads'
sleep 1
threads = threads.select { |t| t.alive? ? true : (t.join; false) }
end
rescue Exception => e
puts e
end
end
This way I open a new Thread, usually of a incoming connection or some other thing, add the Thread to a threads array, and then check that I don't have more threads then what I wanted.
What would be a good way to implement something similar in Crystal using spawn\channels\fibers etc.. ?
Something like this:
require "socket"
ch = Channel(TCPSocket).new
10.times do
spawn do
loop do
socket = ch.receive
socket.puts "Hi!"
socket.close
end
end
end
server = TCPServer.new(1234)
loop do
socket = server.accept
ch.send socket
end
This code will pre-spawn 10 fibers to attend the requests. The channel is unbuffered so the connections wont be queuing if they cannot be attended by any fiber.
You can't replicate the way it works for threads. spawn
doesn't return a coroutine object, and there ain't no way to join
coroutines.
Yet we can open a channel to communicate between the coroutines and the pool manager. This manager may run within it's own coroutine or be the main coroutine —that will prevent the process from exiting.
Here is a working example, with a worker(&block)
method that will spawn a coroutine, and open a channel to return its status (it failed or it terminated), and a pool(&block)
method that will keep a pool of such workers and read from the result channels to know the state of the coroutines, and keep spawning new ones.
def worker(&block)
result = UnbufferedChannel(Exception?).new
::spawn do
begin
block.call
rescue ex
result.send(ex)
else
result.send(nil)
end
end
result
end
def pool(size, &block)
counter = 0
results = [] of UnbufferedChannel(Exception?)
loop do
while counter < size
counter += 1
puts "spawning worker"
results << worker(&block)
end
result = Channel.select(results)
counter -= 1
results.delete(result)
if ex = result.receive
puts "ERROR: #{ex.message}"
else
puts "worker terminated"
end
end
end
pool(5) do
loop { helper_method(1, 2, 3, 4) }
end
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With