Reading through the documentation, Stream.resource
seems only intended for creating a resource that one can read/get values from, not write/put to. Have I understood correctly or am I reading it wrong? And if I've understand it correctly, what type of resource must I create to write/put to from a stream, a Collectable
?
You have read the documentation correctly, Stream.resource
is only a convenience method for emitting values. It is also correct that if you want to consume values, you need to implement the Collectable
protocol. You can look into the source code of File.Stream
, which implements both Enumerable
and Collectable
.
For demonstration purposes, here’s a ChunkedWriter
module that stores values until the buffer is full, then flushes it when the limit is reached:
defmodule ChunkedWriter do
def open(chunk_size) do
Agent.start_link fn -> {[], chunk_size} end
end
def write(agent, value) do
Agent.update agent, fn {old_buffer, chunk_size} ->
buffer = [value | old_buffer]
new_buffer = cond do
length(buffer) < chunk_size -> buffer
true -> do_flush(buffer)
end
{new_buffer, chunk_size}
end
end
def flush(agent) do
Agent.update agent, fn {buffer, chunk_size} ->
{do_flush(buffer), chunk_size}
end
end
defp do_flush(buffer) do
buffer |> Enum.reverse |> Enum.each(&IO.puts/1)
IO.puts "---"
[]
end
def close(agent) do
flush(agent)
Agent.stop(agent)
end
def stream(chunk_size) do
%ChunkedWriter.Stream{chunk_size: chunk_size}
end
end
This module would be used like this:
writer = ChunkedWriter.open(3)
ChunkedWriter.write(writer, 1)
ChunkedWriter.write(writer, 2)
ChunkedWriter.write(writer, 3)
ChunkedWriter.write(writer, 4)
ChunkedWriter.write(writer, 5)
ChunkedWriter.close(writer)
This outputs
1
2
3
---
4
5
---
Now the ChunkedWriter.stream/1
method just sets up a struct which will then be dispatched to ChunkedWriter.Stream
. Here is the ChunkedWriter.Stream
module with its Collectable
implementation, so we can pipe an Enumerable
into it.
defmodule ChunkedWriter.Stream do
defstruct chunk_size: 1
defimpl Collectable do
def into(stream = %ChunkedWriter.Stream{chunk_size: chunk_size}) do
{:ok, writer} = ChunkedWriter.open(chunk_size)
{stream, fn
_acc, {:cont, value} ->
ChunkedWriter.write(writer, value)
_acc, :done ->
:ok = ChunkedWriter.close(writer)
stream
_, :halt ->
:ok = ChunkedWriter.close(writer)
end}
end
end
end
In action:
Stream.cycle([1,2,3])
|> Stream.take(10)
|> Stream.into(ChunkedWriter.stream(4))
|> Stream.run
This prints:
1
2
3
1
---
2
3
1
2
---
3
1
---
You are correct. Stream.resource/3 is a convenience for defining sources. Underneath, Stream.resource/3 implements an Enumerable, which is the protocol all sources (and transformers like map) implement.
The Enumerable protocol is really good about taking values out of things and passing them forward, however it doesn't work for collecting those values into something sensible. For that you'd implement the Collectable protocol.
Well, you may wonder, what if you want both?
You can look into the File.Stream
or GenEvent.Stream
implementations in Elixir code for an example. In short, assuming you have a MyData
module, you could define a MyData.stream/0
function that will return a %MyData.Stream{}
struct that implements both Enumerable
and Collectable
protocols, allowing you to provide both mechanisms.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With