Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Detecting Elixir/OTP supervisor child spawn and termination

I am building a job queue in Elixir as an academic exercise. Currently my workers have to manually register themselves with the queue when they're created (See MyQuestion.Worker.start_link).

I'd like my supervisor to register the available workers with the queue when they are created/restart as this seems like it would aid testing of workers and minimise coupling.

Is there a way to do what I've described in the code below in MyQuestion.Supervisor?

defmodule MyQuestion.Supervisor do
  use Supervisor

  def start_link do
    supervisor = Supervisor.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    children = [
      worker(MyQuestion.JobQueue, []),
      worker(MyQuestion.Worker, [], id: :worker_0),
      worker(MyQuestion.Worker, [], id: :worker_1)]
    supervise(children, strategy: :rest_for_one)
  end

  # LOOKING FOR SOMETHING LIKE THIS
  # on worker spawn, I want to add the worker to the queue
  def child_spawned(pid, {MyQuestion.Worker, _, _}) do
    # add worker to queue
    MyQuestion.JobQueue.add_new_worker(pid)
  end

  # LOOKING FOR SOMETHING LIKE THIS
  # I want some way to do the following (imagine the callback existed)
  def child_terminated(pid, reason, state) 
    # with this information I could tell the job queue to mark 
    # the job associated with the pid as failed and to retry
    # or maybe extract the job id from the worker state, etc.
    MyQuestion.JobQueue.remove_worker(pid)
    MyQuestion.JobQueue.restart_job_for_failed_worker(pid)
  end

end

defmodule MyQuestion.JobQueue do
  def start_link do
    Agent.start_link(fn -> [] end, name: __MODULE__)
  end

  def new_worker(pid) do
    # register pid with agent state in available worker list, etc.
  end

  def add_job(job_description) do
    # find idle worker and run job
    <... snip ...>
  end

  <... snip ...>
end

defmodule MyQuestion.Worker do
  use GenServer
  def start_link do
    # start worker
    {:ok, worker} = GenServer.start_link(__MODULE__, [])

    # Now we have a worker pid, so we can register that pid with the queue
    # I wish this could be in the supervisor or else where.
    MyQuestion.JobQueue.add_new_worker(worker)

    # must return gen server's start link
    {:ok, worker}
  end

  <... snip ...>
end
like image 642
Soup Avatar asked Oct 17 '15 13:10

Soup


1 Answers

They key was a combination of calling Process.monitor(pid) – then you will receive calls to handle_info – and manually calling Supervisor.start_child which gives you pids.

I had previously tried to use handle_info but could never get it to be called. Process.monitor(pid) must be called from the same process that you want to receive the notifications, so you have to call it from inside a handle_call function to associate the monitor with your server process. There may be a function to to run code as another process (i.e. run_from_process(job_queue_pid, fn -> Process.monitor(pid_to_monitor) end)) but I could not find anything.

Attached is a very naive implementation of a job queue. I'm only a day in to Elixir so the code is both messy and non-idiomatic but I'm attaching it because there seems to be a lack of example code around the topic.

Look at HeavyIndustry.JobQueue, handle_info, create_new_worker. There is one obvious issue with this code: it is able to restart workers when they crash, but it's not able to start the queue on the next job from that code (due to requiring a GenServer.call inside handle_info, which deadlocks us). I think you could fix this by separating the process that starts jobs from the process that tracks jobs. If you run the example code you will notice eventually it stops running jobs even though there is one still in the queue (the :crash job).

defmodule HeavyIndustry.Supervisor do
  use Supervisor

  def start_link do
    Supervisor.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    # default to supervising nothing, we will add
    supervise([], strategy: :one_for_one)
  end

  def create_children(supervisor, worker_count) do
    # create the job queue. defaults to no workers
    Supervisor.start_child(supervisor, worker(HeavyIndustry.JobQueue, [[supervisor, worker_count]]))
  end
end

defmodule HeavyIndustry.JobQueue do
  use GenServer

  @job_queue_name __MODULE__

  def start_link(args, _) do
    GenServer.start_link(__MODULE__, args, name: @job_queue_name)
  end

  def init([supervisor, n]) do
    # set some default state
    state = %{
      supervisor: supervisor,
      max_workers: n,
      jobs: [],
      workers: %{
        idle: [],
        busy: []
      }
    }
    {:ok, state}
  end

  def setup() do
    # we want to be aware of worker failures. we hook into this by calling
    # Process.monitor(pid), but this links the calling process with the monitored
    # process. To make sure the calls come to US and not the process that called
    # setup, we create the workers by passing a message to our server process
    state = GenServer.call(@job_queue_name, :setup)

    # gross passing the whole state back here to monitor but the monitoring must
    # be started from the server process and we can't call GenServer.call from
    # inside the :setup call else we deadlock.
    workers = state.workers.idle
    GenServer.call(@job_queue_name, {:monitor_pids, workers})
  end

  def add_job(from, job) do
    # add job to queue
    {:ok, our_job_id} = GenServer.call(@job_queue_name, {:create_job, %{job: job, reply_to: from}})

    # try to run the next job
    case GenServer.call(@job_queue_name, :start_next_job) do
      # started our job
      {:ok, started_job_id = ^our_job_id} -> {:ok, :started}
      # started *a* job
      {:ok, _} -> {:ok, :pending}
      # couldnt start any job but its ok...
      {:error, :no_idle_workers} -> {:ok, :pending}
      # something fell over...
      {:error, e} -> {:error, e}
      # yeah I know this is bad.
      _ -> {:ok}
    end
  end

  def start_next_job do
    GenServer.call(@job_queue_name, :start_next_job)
  end

  ##
  # Internal API
  ##

  def handle_call(:setup, _, state) do
    workers = Enum.map(0..(state.max_workers-1), fn (n) ->
      {:ok, pid} = start_new_worker(state.supervisor)
      pid
    end)
    state = %{state | workers: %{state.workers | idle: workers}}
    {:reply, state, state}
  end

  defp start_new_worker(supervisor) do
    spec = Supervisor.Spec.worker(HeavyIndustry.Worker, [], id: :"Worker.#{:os.system_time}", restart: :temporary)
    # start worker
    Supervisor.start_child(supervisor, spec)
  end

  def handle_call({:monitor_pids, list}, _, state) do
    Enum.each(list, &Process.monitor(&1))
    {:reply, :ok, state}
  end

  def handle_call({:create_job, job}, from, state) do
    job = %{
      job: job.job,
      reply_to: job.reply_to,
      id: :os.system_time, # id for task
      status: :pending, # start pending, go active, then remove
      pid: nil
    }
    # add new job to jobs list
    state = %{state | jobs: state.jobs ++ [job]}
    {:reply, {:ok, job.id}, state}
  end

  def handle_call(:start_next_job, _, state) do
    IO.puts "==> Start Next Job"
    IO.inspect state
    IO.puts "=================="

    reply = case {find_idle_worker(state.workers), find_next_job(state.jobs)} do
      {{:error, :no_idle_workers}, _} ->
        # no workers for job, doesnt matter if we have a job
        {:error, :no_idle_workers}

      {_, nil} ->
        # no job, doesnt matter if we have a worker
        {:error, :no_more_jobs}

      {{:ok, worker}, job} ->
        # have worker, have job, do work

        # update state to set job active and worker busy
        jobs = state.jobs -- [job]
        job = %{job | status: :active, pid: worker}
        jobs = jobs ++ [job]

        idle = state.workers.idle -- [worker]
        busy = state.workers.busy ++ [worker]

        state = %{state | jobs: jobs, workers: %{idle: idle, busy: busy}}

        {:ok, task_id} = Task.start(fn ->
          result = GenServer.call(worker, job.job)

          remove_job(job)
          free_worker(worker)

          send job.reply_to, %{answer: result, job: job.job}

          start_next_job
        end)
        {:ok, job.id}
    end

    {:reply, reply, state}
  end

  defp find_idle_worker(workers) do
    case workers do
      %{idle: [], busy: _} -> {:error, :no_idle_workers}
      %{idle: [worker | idle], busy: busy} -> {:ok, worker}
    end
  end

  defp find_next_job(jobs) do
    jobs |> Enum.find(&(&1.status == :pending))
  end

  defp free_worker(worker) do
    GenServer.call(@job_queue_name, {:free_worker, worker})
  end
  defp remove_job(job) do
    GenServer.call(@job_queue_name, {:remove_job, job})
  end

  def handle_call({:free_worker, worker}, from, state) do
    idle = state.workers.idle ++ [worker]
    busy = state.workers.busy -- [worker]
    {:reply, :ok, %{state | workers: %{idle: idle, busy: busy}}}
  end

  def handle_call({:remove_job, job}, from, state) do
    jobs = state.jobs -- [job]
    {:reply, :ok, %{state | jobs: jobs}}
  end

  def handle_info(msg = {reason, ref, :process, pid, _reason}, state) do
    IO.puts "Worker collapsed: #{reason} #{inspect pid}, clear and restart job"

    # find job for collapsed worker
    # set job to pending again
    job = Enum.find(state.jobs, &(&1.pid == pid))
    fixed_job = %{job | status: :pending, pid: nil}
    jobs = (state.jobs -- [job]) ++ [fixed_job]

    # remote worker from lists
    idle = state.workers.idle -- [pid]
    busy = state.workers.busy -- [pid]

    # start new worker
    {:ok, pid} = start_new_worker(state.supervisor)

    # add worker from lists
    idle = state.workers.idle ++ [pid]

    # cant call GenServer.call from here to monitor pid,
    # so duplicate the code a bit...
    Process.monitor(pid)

    # update state
    state = %{state | jobs: jobs, workers: %{idle: idle, busy: busy}}

    {:noreply, state}
  end
end

defmodule HeavyIndustry.Worker do
  use GenServer

  def start_link do
    GenServer.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    # workers have no persistent state
    IO.puts "==> Worker up! #{inspect self}"
    {:ok, nil}
  end

  def handle_call({:sum, list}, from, _) do
    sum = Enum.reduce(list, fn (n, acc) -> acc + n end)
    {:reply, sum, nil}
  end

  def handle_call({:fib, n}, from, _) do
    sum = fib_calc(n)
    {:reply, sum, nil}
  end

  def handle_call({:stop}, from, state) do
    {:stop, "my-stop-reason", "my-stop-reply", state}
  end

  def handle_call({:crash}, from, _) do
    {:reply, "this will crash" ++ 1234, nil}
  end

  def handle_call({:timeout}, from, _) do
    :timer.sleep 10000
    {:reply, "this will timeout", nil}
  end

  # Slow fib
  defp fib_calc(0), do: 0
  defp fib_calc(1), do: 1
  defp fib_calc(n), do: fib_calc(n-1) + fib_calc(n-2)

end

defmodule Looper do
  def start do
    {:ok, pid} = HeavyIndustry.Supervisor.start_link
    {:ok, job_queue} = HeavyIndustry.Supervisor.create_children(pid, 2)
    HeavyIndustry.JobQueue.setup()
    add_jobs
    loop
  end

  def add_jobs do
    jobs = [
      {:sum, [100, 200, 300]},
      {:crash},
      {:fib, 35},
      {:fib, 35},
      {:sum, [88, 88, 99]},
      {:fib, 35},
      {:fib, 35},
      {:fib, 35},
      {:sum, 0..100},
      # {:stop}, # stop not really a failure

      {:sum, [88, 88, 99]},
      # {:timeout},
      {:sum, [-1]}
    ]
    Enum.each(jobs, fn (job) ->
      IO.puts "~~~~> Add job: #{inspect job}"
      case HeavyIndustry.JobQueue.add_job(self, job) do
        {:ok, :started} -> IO.puts "~~~~> Started job immediately"
        {:ok, :pending} -> IO.puts "~~~~> Job in queue"
        val -> IO.puts "~~~~> ... val: #{inspect val}"
      end
    end)
  end

  def loop do
    receive do
      value ->
        IO.puts "~~~~> Received: #{inspect value}"
        loop
    end
  end
end

Looper.start
like image 181
Soup Avatar answered Sep 23 '22 17:09

Soup