Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Runtime-dynamic compute graph using Elixir Genstage

I'd like to be able to dynamically change a compute pipeline at runtime, but it seems that GenStage requires the compute graph to be defined at compile time through the subscribe_to: [...] mechanism. Is there a way to create dynamic compute graphs? For example in the below, I'd like to switch, at runtime, between the "subtract 7" and "subtract 4" vertices in my pipeline graph.

enter image description here

Is this possible using GenStage? I will likely have very complex pipelines so I need a solution that scales to changing graphs in complex ways, as opposed to ad-hoc solutions such as in this case, say, parameterising the integer to subtract. I'd like to be able to add or remove entire sub-trees, switch between subtrees, and add nodes into the graph including splicing them into the middle of any subtree including the main tree.

Please refer to EDIT further down

Here is the initial producer:

defmodule GenstageTest.Producer do
  use GenStage

  def start_link(initial \\ 1) do
    GenStage.start_link(__MODULE__, initial, name: __MODULE__)
  end

  def init(counter), do: {:producer, counter}

  def handle_demand(demand, state) do
    events = Enum.to_list(state..(state + demand - 1))
    {:noreply, events, state + demand}
  end
end

Here is one of the producer_consumers:

defmodule GenstageTest.PcTimesFive do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
  end

  def init(state) do
    {:producer_consumer, state, subscribe_to: [GenstageTest.PcAddOne]}
  end

  def handle_events(events, _from, state) do
    numbers =
      events
      |> Enum.map(&(&1 * 5))
    {:noreply, numbers, state}
  end
end

and here is the final consumer:

defmodule GenstageTest.Consumer do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter)
  end

  def init(state) do
    {:consumer, state, subscribe_to: [GenstageTest.PcDivTwo]}
  end

  def handle_events(events, _from, state) do
    for event <- events do
      IO.inspect({self(), event, state})
    end

    # As a consumer we never emit events
    {:noreply, [], state}
  end
end

I It is all modelled off the Elixir School Genstage tutorial.

All the modules and the mix.exs can be found on github.

EDIT 3 days later after partial answer from @AquarHEAD L.

I have managed to get runtime subscriptions working. Here are some modified producers, producer_consumers, and consumers respectively:

Producer:

defmodule GenstageTest.Producer do
  use GenStage

  def start_link(initial \\ 1) do
    GenStage.start_link(__MODULE__, initial, name: __MODULE__)
  end

  def init(counter), do: {:producer, counter}

  def handle_demand(demand, state) do
    events = Enum.to_list(state..(state + demand - 1))
    {:noreply, events, state + demand}
  end

  def handle_info({:doprint}, state) do
    IO.puts "yep"
    {:noreply, [], state}
  end

  def handle_info({:cancel, sublink}, state) do
    GenStage.cancel sublink, []
    {:noreply, [], state}
  end

end

Producter_consumer:

defmodule GenstageTest.PcAddOne do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
  end

  def init(state) do
    {:producer_consumer, state}
  end

  def handle_events(events, _from, state) do
    numbers =
      events
      |> Enum.map(&(&1 + 1))
    {:noreply, numbers, state}
  end
end

Consumer:

defmodule GenstageTest.Consumer do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter)
  end

  def init(state) do
    {:consumer, state}
  end

  def handle_events(events, _from, state) do
    for event <- events do
      IO.inspect event
      #File.write("/home/tbrowne/scratch/output.txt", 
      #  Kernel.inspect(event) <> " ", [:append])
      :timer.sleep(100)
    end

    # As a consumer we never emit events
    {:noreply, [], state}
  end
end

Now once these are all available in the lib directory (remember to add {:gen_stage, "~> 0.11"} to your mix.exs deps), or copied and pasted into IEX, then the following will work perfectly:

{:ok, p} = GenstageTest.Producer.start_link(0)
{:ok, a1} = GenstageTest.PcAddOne.start_link()
{:ok, c} = GenstageTest.Consumer.start_link()
{:ok, link1} = GenStage.sync_subscribe(a1, to: p, min_demand: 0, max_demand: 1, cancel: :transient)
{:ok, link2} = GenStage.sync_subscribe(c, to: a1, min_demand: 0, max_demand: 1, cancel: :transient)

The issue now is though, that I still don't know how to cancel the subscription. There is a cancel function and there is also a stop function. GenStage.stop(c) for example seems to do nothing, while my various attempts at GenStage.cancel/3 only give errors.

To recap, what I need now is to be able to stop certain stages and replace them with others. What is the syntax for cancelling a subcsription, and from where is it called? It is not well explained in the docs as there is no concrete example.

like image 637
Thomas Browne Avatar asked Oct 24 '18 12:10

Thomas Browne


1 Answers

You can absolutely change the pipeline at runtime, checkout the first example in GenStage documentation, you can also use the :manual mode to fine control the demand. There's also API to cancel subscription. I think these are enough to dynamically manage GenStage pipelines.

like image 126
aquarhead Avatar answered Nov 08 '22 18:11

aquarhead