I am building a job queue in Elixir as an academic exercise. Currently my workers have to manually register themselves with the queue when they're created (See MyQuestion.Worker.start_link
).
I'd like my supervisor to register the available workers with the queue when they are created/restart as this seems like it would aid testing of workers and minimise coupling.
Is there a way to do what I've described in the code below in MyQuestion.Supervisor
?
defmodule MyQuestion.Supervisor do
use Supervisor
def start_link do
supervisor = Supervisor.start_link(__MODULE__, :ok)
end
def init(:ok) do
children = [
worker(MyQuestion.JobQueue, []),
worker(MyQuestion.Worker, [], id: :worker_0),
worker(MyQuestion.Worker, [], id: :worker_1)]
supervise(children, strategy: :rest_for_one)
end
# LOOKING FOR SOMETHING LIKE THIS
# on worker spawn, I want to add the worker to the queue
def child_spawned(pid, {MyQuestion.Worker, _, _}) do
# add worker to queue
MyQuestion.JobQueue.add_new_worker(pid)
end
# LOOKING FOR SOMETHING LIKE THIS
# I want some way to do the following (imagine the callback existed)
def child_terminated(pid, reason, state)
# with this information I could tell the job queue to mark
# the job associated with the pid as failed and to retry
# or maybe extract the job id from the worker state, etc.
MyQuestion.JobQueue.remove_worker(pid)
MyQuestion.JobQueue.restart_job_for_failed_worker(pid)
end
end
defmodule MyQuestion.JobQueue do
def start_link do
Agent.start_link(fn -> [] end, name: __MODULE__)
end
def new_worker(pid) do
# register pid with agent state in available worker list, etc.
end
def add_job(job_description) do
# find idle worker and run job
<... snip ...>
end
<... snip ...>
end
defmodule MyQuestion.Worker do
use GenServer
def start_link do
# start worker
{:ok, worker} = GenServer.start_link(__MODULE__, [])
# Now we have a worker pid, so we can register that pid with the queue
# I wish this could be in the supervisor or else where.
MyQuestion.JobQueue.add_new_worker(worker)
# must return gen server's start link
{:ok, worker}
end
<... snip ...>
end
They key was a combination of calling Process.monitor(pid)
– then you will receive calls to handle_info
– and manually calling Supervisor.start_child
which gives you pids.
I had previously tried to use handle_info
but could never get it to be called. Process.monitor(pid)
must be called from the same process that you want to receive the notifications, so you have to call it from inside a handle_call
function to associate the monitor with your server process. There may be a function to to run code as another process (i.e. run_from_process(job_queue_pid, fn -> Process.monitor(pid_to_monitor) end)
) but I could not find anything.
Attached is a very naive implementation of a job queue. I'm only a day in to Elixir so the code is both messy and non-idiomatic but I'm attaching it because there seems to be a lack of example code around the topic.
Look at HeavyIndustry.JobQueue
, handle_info
, create_new_worker
. There is one obvious issue with this code: it is able to restart workers when they crash, but it's not able to start the queue on the next job from that code (due to requiring a GenServer.call
inside handle_info
, which deadlocks us). I think you could fix this by separating the process that starts jobs from the process that tracks jobs. If you run the example code you will notice eventually it stops running jobs even though there is one still in the queue (the :crash
job).
defmodule HeavyIndustry.Supervisor do
use Supervisor
def start_link do
Supervisor.start_link(__MODULE__, :ok)
end
def init(:ok) do
# default to supervising nothing, we will add
supervise([], strategy: :one_for_one)
end
def create_children(supervisor, worker_count) do
# create the job queue. defaults to no workers
Supervisor.start_child(supervisor, worker(HeavyIndustry.JobQueue, [[supervisor, worker_count]]))
end
end
defmodule HeavyIndustry.JobQueue do
use GenServer
@job_queue_name __MODULE__
def start_link(args, _) do
GenServer.start_link(__MODULE__, args, name: @job_queue_name)
end
def init([supervisor, n]) do
# set some default state
state = %{
supervisor: supervisor,
max_workers: n,
jobs: [],
workers: %{
idle: [],
busy: []
}
}
{:ok, state}
end
def setup() do
# we want to be aware of worker failures. we hook into this by calling
# Process.monitor(pid), but this links the calling process with the monitored
# process. To make sure the calls come to US and not the process that called
# setup, we create the workers by passing a message to our server process
state = GenServer.call(@job_queue_name, :setup)
# gross passing the whole state back here to monitor but the monitoring must
# be started from the server process and we can't call GenServer.call from
# inside the :setup call else we deadlock.
workers = state.workers.idle
GenServer.call(@job_queue_name, {:monitor_pids, workers})
end
def add_job(from, job) do
# add job to queue
{:ok, our_job_id} = GenServer.call(@job_queue_name, {:create_job, %{job: job, reply_to: from}})
# try to run the next job
case GenServer.call(@job_queue_name, :start_next_job) do
# started our job
{:ok, started_job_id = ^our_job_id} -> {:ok, :started}
# started *a* job
{:ok, _} -> {:ok, :pending}
# couldnt start any job but its ok...
{:error, :no_idle_workers} -> {:ok, :pending}
# something fell over...
{:error, e} -> {:error, e}
# yeah I know this is bad.
_ -> {:ok}
end
end
def start_next_job do
GenServer.call(@job_queue_name, :start_next_job)
end
##
# Internal API
##
def handle_call(:setup, _, state) do
workers = Enum.map(0..(state.max_workers-1), fn (n) ->
{:ok, pid} = start_new_worker(state.supervisor)
pid
end)
state = %{state | workers: %{state.workers | idle: workers}}
{:reply, state, state}
end
defp start_new_worker(supervisor) do
spec = Supervisor.Spec.worker(HeavyIndustry.Worker, [], id: :"Worker.#{:os.system_time}", restart: :temporary)
# start worker
Supervisor.start_child(supervisor, spec)
end
def handle_call({:monitor_pids, list}, _, state) do
Enum.each(list, &Process.monitor(&1))
{:reply, :ok, state}
end
def handle_call({:create_job, job}, from, state) do
job = %{
job: job.job,
reply_to: job.reply_to,
id: :os.system_time, # id for task
status: :pending, # start pending, go active, then remove
pid: nil
}
# add new job to jobs list
state = %{state | jobs: state.jobs ++ [job]}
{:reply, {:ok, job.id}, state}
end
def handle_call(:start_next_job, _, state) do
IO.puts "==> Start Next Job"
IO.inspect state
IO.puts "=================="
reply = case {find_idle_worker(state.workers), find_next_job(state.jobs)} do
{{:error, :no_idle_workers}, _} ->
# no workers for job, doesnt matter if we have a job
{:error, :no_idle_workers}
{_, nil} ->
# no job, doesnt matter if we have a worker
{:error, :no_more_jobs}
{{:ok, worker}, job} ->
# have worker, have job, do work
# update state to set job active and worker busy
jobs = state.jobs -- [job]
job = %{job | status: :active, pid: worker}
jobs = jobs ++ [job]
idle = state.workers.idle -- [worker]
busy = state.workers.busy ++ [worker]
state = %{state | jobs: jobs, workers: %{idle: idle, busy: busy}}
{:ok, task_id} = Task.start(fn ->
result = GenServer.call(worker, job.job)
remove_job(job)
free_worker(worker)
send job.reply_to, %{answer: result, job: job.job}
start_next_job
end)
{:ok, job.id}
end
{:reply, reply, state}
end
defp find_idle_worker(workers) do
case workers do
%{idle: [], busy: _} -> {:error, :no_idle_workers}
%{idle: [worker | idle], busy: busy} -> {:ok, worker}
end
end
defp find_next_job(jobs) do
jobs |> Enum.find(&(&1.status == :pending))
end
defp free_worker(worker) do
GenServer.call(@job_queue_name, {:free_worker, worker})
end
defp remove_job(job) do
GenServer.call(@job_queue_name, {:remove_job, job})
end
def handle_call({:free_worker, worker}, from, state) do
idle = state.workers.idle ++ [worker]
busy = state.workers.busy -- [worker]
{:reply, :ok, %{state | workers: %{idle: idle, busy: busy}}}
end
def handle_call({:remove_job, job}, from, state) do
jobs = state.jobs -- [job]
{:reply, :ok, %{state | jobs: jobs}}
end
def handle_info(msg = {reason, ref, :process, pid, _reason}, state) do
IO.puts "Worker collapsed: #{reason} #{inspect pid}, clear and restart job"
# find job for collapsed worker
# set job to pending again
job = Enum.find(state.jobs, &(&1.pid == pid))
fixed_job = %{job | status: :pending, pid: nil}
jobs = (state.jobs -- [job]) ++ [fixed_job]
# remote worker from lists
idle = state.workers.idle -- [pid]
busy = state.workers.busy -- [pid]
# start new worker
{:ok, pid} = start_new_worker(state.supervisor)
# add worker from lists
idle = state.workers.idle ++ [pid]
# cant call GenServer.call from here to monitor pid,
# so duplicate the code a bit...
Process.monitor(pid)
# update state
state = %{state | jobs: jobs, workers: %{idle: idle, busy: busy}}
{:noreply, state}
end
end
defmodule HeavyIndustry.Worker do
use GenServer
def start_link do
GenServer.start_link(__MODULE__, :ok)
end
def init(:ok) do
# workers have no persistent state
IO.puts "==> Worker up! #{inspect self}"
{:ok, nil}
end
def handle_call({:sum, list}, from, _) do
sum = Enum.reduce(list, fn (n, acc) -> acc + n end)
{:reply, sum, nil}
end
def handle_call({:fib, n}, from, _) do
sum = fib_calc(n)
{:reply, sum, nil}
end
def handle_call({:stop}, from, state) do
{:stop, "my-stop-reason", "my-stop-reply", state}
end
def handle_call({:crash}, from, _) do
{:reply, "this will crash" ++ 1234, nil}
end
def handle_call({:timeout}, from, _) do
:timer.sleep 10000
{:reply, "this will timeout", nil}
end
# Slow fib
defp fib_calc(0), do: 0
defp fib_calc(1), do: 1
defp fib_calc(n), do: fib_calc(n-1) + fib_calc(n-2)
end
defmodule Looper do
def start do
{:ok, pid} = HeavyIndustry.Supervisor.start_link
{:ok, job_queue} = HeavyIndustry.Supervisor.create_children(pid, 2)
HeavyIndustry.JobQueue.setup()
add_jobs
loop
end
def add_jobs do
jobs = [
{:sum, [100, 200, 300]},
{:crash},
{:fib, 35},
{:fib, 35},
{:sum, [88, 88, 99]},
{:fib, 35},
{:fib, 35},
{:fib, 35},
{:sum, 0..100},
# {:stop}, # stop not really a failure
{:sum, [88, 88, 99]},
# {:timeout},
{:sum, [-1]}
]
Enum.each(jobs, fn (job) ->
IO.puts "~~~~> Add job: #{inspect job}"
case HeavyIndustry.JobQueue.add_job(self, job) do
{:ok, :started} -> IO.puts "~~~~> Started job immediately"
{:ok, :pending} -> IO.puts "~~~~> Job in queue"
val -> IO.puts "~~~~> ... val: #{inspect val}"
end
end)
end
def loop do
receive do
value ->
IO.puts "~~~~> Received: #{inspect value}"
loop
end
end
end
Looper.start
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With