I'm beginning to learn Elixir and have come across a challenge that I haven't been able to solve easily.
I'm trying to create function that takes an Enumerable.t and returns another Enumerable.t that includes the next n items. It would have slightly different behaviour from Enum.chunk(e, n, 1, []) in that the number iteration count would always equal the original enumerable count. I also need to support Streams
@spec lookahead(Enumerable.t, non_neg_integer) :: Enumerable.t
This is best illustrated with doctest syntax:
iex> lookahead(1..6, 1) |> Enum.to_list
[[1,2],[2,3],[3,4],[4,5],[5,6],[6]]
iex> lookahead(1..4, 2) |> Enum.to_list
[[1,2,3],[2,3,4],[3,4],[4]]
iex> Stream.cycle(1..4) |> lookahead(2) |> Enum.take(5)
[[1,2,3],[2,3,4],[3,4,1],[4,1,2],[1,2,3]]
iex> {:ok,io} = StringIO.open("abcd")
iex> IO.stream(io,1) |> lookahead(2) |> Enum.to_list
[["a","b","c"],["b","c","d"],["c","d"],["d"]]
I have investigated implementing the Enumerable.t protocol, but haven't quite understood the Enumerable.reduce interface.
Is there any succinct/elegant way of doing this?
My use case is for a small fixed n value (1 or 2) on a binary stream, so extra points for an optimized version. However, for the purpose of learning Elixir I'm interested in a solution across a number of use cases. Performance is important. I will run some benchmarks across various values of n for the solution and publish.
Benchmark Update - 8th April 2015
6 workable solutions have been posted. Details of the benchmarks are available at https://gist.github.com/spitsw/fce5304ec6941578e454. Benchmarks were run over a list with 500 items for various values of n.
For n=1 the following result:
PatrickSuspend.lookahead 104.90 µs/op
Warren.lookahead 174.00 µs/op
PatrickChunk.lookahead 310.60 µs/op
PatrickTransform.lookahead 357.00 µs/op
Jose.lookahead 647.60 µs/op
PatrickUnfold.lookahead 1484000.00 µs/op
For n=50 the following result:
PatrickSuspend.lookahead 220.80 µs/op
Warren.lookahead 320.60 µs/op
PatrickTransform.lookahead 518.60 µs/op
Jose.lookahead 1390.00 µs/op
PatrickChunk.lookahead 3058.00 µs/op
PatrickUnfold.lookahead 1345000.00 µs/op (faster than n=1)
As discussed in the comments, my first attempt had some performance problems and didn't work with streams that have side-effects, such as IO streams. I took the time to dig deeper into the stream library and finally came up with this solution:
defmodule MyStream
def lookahead(enum, n) do
step = fn val, _acc -> {:suspend, val} end
next = &Enumerable.reduce(enum, &1, step)
&do_lookahead(n, :buffer, [], next, &1, &2)
end
# stream suspended
defp do_lookahead(n, state, buf, next, {:suspend, acc}, fun) do
{:suspended, acc, &do_lookahead(n, state, buf, next, &1, fun)}
end
# stream halted
defp do_lookahead(_n, _state, _buf, _next, {:halt, acc}, _fun) do
{:halted, acc}
end
# initial buffering
defp do_lookahead(n, :buffer, buf, next, {:cont, acc}, fun) do
case next.({:cont, []}) do
{:suspended, val, next} ->
new_state = if length(buf) < n, do: :buffer, else: :emit
do_lookahead(n, new_state, buf ++ [val], next, {:cont, acc}, fun)
{_, _} ->
do_lookahead(n, :emit, buf, next, {:cont, acc}, fun)
end
end
# emitting
defp do_lookahead(n, :emit, [_|rest] = buf, next, {:cont, acc}, fun) do
case next.({:cont, []}) do
{:suspended, val, next} ->
do_lookahead(n, :emit, rest ++ [val], next, fun.(buf, acc), fun)
{_, _} ->
do_lookahead(n, :emit, rest, next, fun.(buf, acc), fun)
end
end
# buffer empty, halting
defp do_lookahead(_n, :emit, [], _next, {:cont, acc}, _fun) do
{:halted, acc}
end
end
This may look daunting at first, but actually it's not that hard. I will try to break it down for you, but that's hard with a full-fledged example like this.
Let's start with a simpler example instead: A stream that endlessly repeats the value given to it. In order to emit a stream, we can return a function that takes an accumulator and a function as argument. To emit a value, we call the function with two arguments: the value to emit and the accumulator. acc
The accumulator is a tuple that consists of a command (:cont
, :suspend
or :halt
) and tells us what the consumer wants us to do; the result we need to return depends on the operation. If the stream should be suspended, we return a three-element tuple of the atom :suspended
, the accumulator and a function that will be called when the enumeration continues (sometimes called "continuation"). For the :halt
command, we simply return {:halted, acc}
and for the :cont
we emit a value by performing the recursive step as described above. The whole thing then looks like this:
defmodule MyStream do
def repeat(val) do
&do_repeat(val, &1, &2)
end
defp do_repeat(val, {:suspend, acc}, fun) do
{:suspended, acc, &do_repeat(val, &1, fun)}
end
defp do_repeat(_val, {:halt, acc}, _fun) do
{:halted, acc}
end
defp do_repeat(val, {:cont, acc}, fun) do
do_repeat(val, fun.(val, acc), fun)
end
end
Now this is only one part of the puzzle. We can emit a stream, but we don't process an incoming stream yet. Again, to explain how that works it makes sense to construct a simpler example. Here, I will build a function that takes an enumerable and just suspends and re-emits for every value.
defmodule MyStream do
def passthrough(enum) do
step = fn val, _acc -> {:suspend, val} end
next = &Enumerable.reduce(enum, &1, step)
&do_passthrough(next, &1, &2)
end
defp do_passthrough(next, {:suspend, acc}, fun) do
{:suspended, acc, &do_passthrough(next, &1, fun)}
end
defp do_passthrough(_next, {:halt, acc}, _fun) do
{:halted, acc}
end
defp do_passthrough(next, {:cont, acc}, fun) do
case next.({:cont, []}) do
{:suspended, val, next} ->
do_passthrough(next, fun.(val, acc), fun)
{_, _} ->
{:halted, acc}
end
end
end
The first clause sets up the next
function that gets passed down to the do_passthrough
function. It serves the purpose of getting the next value from the incoming stream. The step function that is internally used defines that we suspend for every item in the stream. The rest is pretty similar except for the last clause. Here, we call the next function with {:cont, []}
to get a new value and process the result by means of a case statement. If there is a value, we get back {:suspended, val, next}
, if not, the stream is halted and we pass that through to the consumer.
I hope that clarifies a few things about how to build streams in Elixir manually. Unfortunately, there's an awful lot of boilerplate required to work with streams. If you go back to the lookahead
implementation now, you will see that there are only tiny differences, which are the actually interesting parts. There are two additional parameters: state
, which serves to differentiate between the :buffer
and :emit
steps, and buffer
which is pre-filled with n+1
items in the initial buffering step. In the emit phase, the current buffer is emitted and then shifted to the left on each iteration. We're done when the input stream halts or our stream is halted directly.
I am leaving my original answer here for reference:
Here's a solution that uses Stream.unfold/2
to emit a true stream of values
according to your specification. This means you need to add Enum.to_list
to
the end of your first two examples to obtain the actual values.
defmodule MyStream do
def lookahead(stream, n) do
Stream.unfold split(stream, n+1), fn
{[], stream} ->
nil
{[_ | buf] = current, stream} ->
{value, stream} = split(stream, 1)
{current, {buf ++ value, stream}}
end
end
defp split(stream, n) do
{Enum.take(stream, n), Stream.drop(stream, n)}
end
end
The general idea is that we keep a buf of the previous iterations around. On each iteration, we emit the current buf, take one value from the stream and append it to the end of the buf. This repeats until the buf is empty.
Example:
iex> MyStream.lookahead(1..6, 1) |> Enum.to_list
[[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6]]
iex> MyStream.lookahead(1..4, 2) |> Enum.to_list
[[1, 2, 3], [2, 3, 4], [3, 4], [4]]
iex> Stream.cycle(1..3) |> MyStream.lookahead(2) |> Enum.take(5)
[[1, 2, 3], [2, 3, 1], [3, 1, 2], [1, 2, 3], [2, 3, 1]]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With