Following scenario: A GenStage producer handles a Twitter Stream (using the Stream API and ExTwitter) and providing a set of tweets (max. the demand the consumer is asking for) to a GenStage consumer. The consumer then just prints them.
Following problem: I am looking for specific tweets so there aren't always new tweets available. If the GenStage producer is returning an empty list of events, the consumer will stop asking. See this issue and José Valims reply for more.
I am not sure how to address this issue. Any help is greatly appreciated. This is what I have so far:
defmodule MyApp.TwitterProducer do
  use GenStage
  alias MyApp.TwitterStream
  def start_link(:ok) do
    GenStage.start_link(__MODULE__, :ok)
  end
  def init(:ok) do
    # This creates a regular Elixir Stream
    # I use this as the state so that not every
    # time the consumer asks for new data
    # a new stream is initiated
    stream = TwitterStream.get_stream
    {:producer, stream}
  end
  def handle_demand(demand, stream) do
    # Take tweets from the stream and 
    # turn them into a list. Then return 
    # them to the consumer
    chunk = Stream.take(stream, demand)
    events = Enum.to_list(chunk)
    {:noreply, events, stream}
  end
  def handle_info(_msg, state) do
    # I as getting an "wrong message" error 
    # before I implemented this function myself
    # It does nothing special to my case
    {:noreply, [], state}
  end
end
defmodule MyApp.TwitterConsumer do
  use GenStage
  def start_link() do
    GenStage.start_link(__MODULE__, :ok)
  end
  def init(:ok) do
    {:consumer, :the_state_does_not_matter}
  end
  def handle_events(events, _from, state) do
    Process.sleep(3000)
    IO.inspect(events)
    # We are a consumer, so we would never emit items.
    {:noreply, [], state}
  end
end
# Let's fire this thing up
{:ok, p} = GenStage.start_link(MyApp.TwitterProducer, :ok, name: MyApp.TwitterProducer)
{:ok, c} = GenStage.start_link(MyApp.TwitterConsumer, :ok, name: MyApp.TwitterConsumer)
GenStage.sync_subscribe(c, to: p, max_demand: 3)
What happens is: This runs for a while, then stops. As I understand as soon as there is an empty event list returned by the producer.
Edit: Interestingly enough: If I set demand to 1 it keeps running. But it is much, much slower than querying the Twitter Stream API directly. Meaning I receive ten times less tweets. My theory is that it is due to the repeated Stream.take calls instead of just calling Enum.to_list for the whole stream. But I find it still very confusing. Any ideas what I am missing?
There is a significant (but unfortunately not expressed in bold) sentence in the documentation on GenStage.handle_demand/2:
The producer must either store the demand or return the events requested.
That said, instead of block on Stream.take one might be explicitly aware of the task might be blocking and handle the case, collecting the demand in such a case using Task.await/2 with a reasonable timeout (maybe Task.yield/2 could be of use in more complicated checks, but here it seems to be an overkill.)
From the documentation:
If you don’t want the task to fail then you must change the
heavy_fun/0code in the same way you would achieve it if you didn’t have the async call. For example, to either return{:ok, val} | :errorresults or, in more extreme cases, by usingtry/rescue.
The documentation lacks the examples, though. OTOH, here it probably would be easier just to return empty list and forget about collecting demand:
def handle_demand(demand, stream) do
  try do
    task = Task.async(fn ->
      stream
      |> Stream.take(demand)
      |> Enum.to_list()
    end)
    Task.await(task, 1000) # one sec
  catch
    :exit, {:timeout, {Task, :await, [_, 1000]}} ->
      {:noreply, [], stream}
  else
    events when is_list(events) ->
      {:noreply, events, stream}
  end
end
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With