Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is Stream.resource only from, not to?

Tags:

elixir

Reading through the documentation, Stream.resource seems only intended for creating a resource that one can read/get values from, not write/put to. Have I understood correctly or am I reading it wrong? And if I've understand it correctly, what type of resource must I create to write/put to from a stream, a Collectable?

like image 440
stoft Avatar asked Dec 11 '14 23:12

stoft


2 Answers

You have read the documentation correctly, Stream.resource is only a convenience method for emitting values. It is also correct that if you want to consume values, you need to implement the Collectable protocol. You can look into the source code of File.Stream, which implements both Enumerable and Collectable.

For demonstration purposes, here’s a ChunkedWriter module that stores values until the buffer is full, then flushes it when the limit is reached:

defmodule ChunkedWriter do
  def open(chunk_size) do
    Agent.start_link fn -> {[], chunk_size} end
  end

  def write(agent, value) do
    Agent.update agent, fn {old_buffer, chunk_size} ->
      buffer = [value | old_buffer]
      new_buffer = cond do
        length(buffer) < chunk_size -> buffer
        true -> do_flush(buffer)
      end
      {new_buffer, chunk_size}
    end
  end

  def flush(agent) do
    Agent.update agent, fn {buffer, chunk_size} ->
      {do_flush(buffer), chunk_size}
    end
  end

  defp do_flush(buffer) do
    buffer |> Enum.reverse |> Enum.each(&IO.puts/1)
    IO.puts "---"
    []
  end

  def close(agent) do
    flush(agent)
    Agent.stop(agent)
  end

  def stream(chunk_size) do
    %ChunkedWriter.Stream{chunk_size: chunk_size}
  end
end

This module would be used like this:

writer = ChunkedWriter.open(3)

ChunkedWriter.write(writer, 1)
ChunkedWriter.write(writer, 2)
ChunkedWriter.write(writer, 3)
ChunkedWriter.write(writer, 4)
ChunkedWriter.write(writer, 5)

ChunkedWriter.close(writer)

This outputs

1
2
3
---
4
5
---

Now the ChunkedWriter.stream/1 method just sets up a struct which will then be dispatched to ChunkedWriter.Stream. Here is the ChunkedWriter.Stream module with its Collectable implementation, so we can pipe an Enumerable into it.

defmodule ChunkedWriter.Stream do
  defstruct chunk_size: 1

  defimpl Collectable do
    def into(stream = %ChunkedWriter.Stream{chunk_size: chunk_size}) do
      {:ok, writer} = ChunkedWriter.open(chunk_size)
      {stream, fn
        _acc, {:cont, value} ->
          ChunkedWriter.write(writer, value)
        _acc, :done ->
          :ok = ChunkedWriter.close(writer)
          stream
        _, :halt ->
          :ok = ChunkedWriter.close(writer)
      end}
    end
  end
end

In action:

Stream.cycle([1,2,3])
|> Stream.take(10)
|> Stream.into(ChunkedWriter.stream(4))
|> Stream.run

This prints:

1
2
3
1
---
2
3
1
2
---
3
1
---
like image 58
Patrick Oscity Avatar answered Sep 27 '22 23:09

Patrick Oscity


You are correct. Stream.resource/3 is a convenience for defining sources. Underneath, Stream.resource/3 implements an Enumerable, which is the protocol all sources (and transformers like map) implement.

The Enumerable protocol is really good about taking values out of things and passing them forward, however it doesn't work for collecting those values into something sensible. For that you'd implement the Collectable protocol.

Well, you may wonder, what if you want both?

You can look into the File.Stream or GenEvent.Stream implementations in Elixir code for an example. In short, assuming you have a MyData module, you could define a MyData.stream/0 function that will return a %MyData.Stream{} struct that implements both Enumerable and Collectable protocols, allowing you to provide both mechanisms.

like image 30
José Valim Avatar answered Sep 27 '22 23:09

José Valim