I'd like to be able to dynamically change a compute pipeline at runtime, but it seems that GenStage requires the compute graph to be defined at compile time through the subscribe_to: [...]
mechanism. Is there a way to create dynamic compute graphs? For example in the below, I'd like to switch, at runtime, between the "subtract 7" and "subtract 4" vertices in my pipeline graph.
Is this possible using GenStage? I will likely have very complex pipelines so I need a solution that scales to changing graphs in complex ways, as opposed to ad-hoc solutions such as in this case, say, parameterising the integer to subtract. I'd like to be able to add or remove entire sub-trees, switch between subtrees, and add nodes into the graph including splicing them into the middle of any subtree including the main tree.
Here is the initial producer:
defmodule GenstageTest.Producer do
use GenStage
def start_link(initial \\ 1) do
GenStage.start_link(__MODULE__, initial, name: __MODULE__)
end
def init(counter), do: {:producer, counter}
def handle_demand(demand, state) do
events = Enum.to_list(state..(state + demand - 1))
{:noreply, events, state + demand}
end
end
Here is one of the producer_consumers:
defmodule GenstageTest.PcTimesFive do
use GenStage
def start_link do
GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
end
def init(state) do
{:producer_consumer, state, subscribe_to: [GenstageTest.PcAddOne]}
end
def handle_events(events, _from, state) do
numbers =
events
|> Enum.map(&(&1 * 5))
{:noreply, numbers, state}
end
end
and here is the final consumer:
defmodule GenstageTest.Consumer do
use GenStage
def start_link do
GenStage.start_link(__MODULE__, :state_doesnt_matter)
end
def init(state) do
{:consumer, state, subscribe_to: [GenstageTest.PcDivTwo]}
end
def handle_events(events, _from, state) do
for event <- events do
IO.inspect({self(), event, state})
end
# As a consumer we never emit events
{:noreply, [], state}
end
end
I It is all modelled off the Elixir School Genstage tutorial.
All the modules and the mix.exs can be found on github.
I have managed to get runtime subscriptions working. Here are some modified producers, producer_consumers, and consumers respectively:
Producer:
defmodule GenstageTest.Producer do
use GenStage
def start_link(initial \\ 1) do
GenStage.start_link(__MODULE__, initial, name: __MODULE__)
end
def init(counter), do: {:producer, counter}
def handle_demand(demand, state) do
events = Enum.to_list(state..(state + demand - 1))
{:noreply, events, state + demand}
end
def handle_info({:doprint}, state) do
IO.puts "yep"
{:noreply, [], state}
end
def handle_info({:cancel, sublink}, state) do
GenStage.cancel sublink, []
{:noreply, [], state}
end
end
Producter_consumer:
defmodule GenstageTest.PcAddOne do
use GenStage
def start_link do
GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
end
def init(state) do
{:producer_consumer, state}
end
def handle_events(events, _from, state) do
numbers =
events
|> Enum.map(&(&1 + 1))
{:noreply, numbers, state}
end
end
Consumer:
defmodule GenstageTest.Consumer do
use GenStage
def start_link do
GenStage.start_link(__MODULE__, :state_doesnt_matter)
end
def init(state) do
{:consumer, state}
end
def handle_events(events, _from, state) do
for event <- events do
IO.inspect event
#File.write("/home/tbrowne/scratch/output.txt",
# Kernel.inspect(event) <> " ", [:append])
:timer.sleep(100)
end
# As a consumer we never emit events
{:noreply, [], state}
end
end
Now once these are all available in the lib directory (remember to add {:gen_stage, "~> 0.11"}
to your mix.exs deps), or copied and pasted into IEX, then the following will work perfectly:
{:ok, p} = GenstageTest.Producer.start_link(0)
{:ok, a1} = GenstageTest.PcAddOne.start_link()
{:ok, c} = GenstageTest.Consumer.start_link()
{:ok, link1} = GenStage.sync_subscribe(a1, to: p, min_demand: 0, max_demand: 1, cancel: :transient)
{:ok, link2} = GenStage.sync_subscribe(c, to: a1, min_demand: 0, max_demand: 1, cancel: :transient)
The issue now is though, that I still don't know how to cancel the subscription. There is a cancel function and there is also a stop function. GenStage.stop(c)
for example seems to do nothing, while my various attempts at GenStage.cancel/3
only give errors.
To recap, what I need now is to be able to stop certain stages and replace them with others. What is the syntax for cancelling a subcsription, and from where is it called? It is not well explained in the docs as there is no concrete example.
You can absolutely change the pipeline at runtime, checkout the first example in GenStage documentation, you can also use the :manual
mode to fine control the demand. There's also API to cancel subscription. I think these are enough to dynamically manage GenStage pipelines.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With