I know how to use Stream.resource() to take the first 5 lines from a file and place them in a list.
str = Stream.resource(fn -> File.open!("./data/fidap011.mtx") end,
fn file ->
case IO.read(file, :line) do
data when is_binary(data) -> {[data], file}
_ -> {:halt, file}
end
end,
fn file -> File.close(file) end)
str |> Enum.take(5)
But how do I then take, say, the next 5 lines from the same stream? If I again type:
str |> Enum.take(5)
I just get the same first 5 lines.
Am I missing something obvious here?
Eventually, I wish to read enough data from my stream to spawn some processes that process that data. When some of those processes complete I wish then to read more from the same stream and thus process the next set of data, etc. Should Stream.chunk() come into play here? But without an example I don't seem to be able to intuit how.
EDIT - several design iterations later!
For my purposes it is easier not to use Stream. Instead I simple create a file pointer/process using
{:ok, fp} = File.open( "data/fidap011.mtx" )
then I actually pass that fp to 30000 different spawned processes
and they have no difficulty in reading from it when ever they like.
Each of those processes changes its state by reading its new state
variables from the file. In the module below oR
and vR
are two
"router" processes that receive message - the code is part of a sparse
matrix /vector multiplier.
defmodule M_Cells do
@moduledoc """
Provides matrix related code
Each cell process serves for that row & col
"""
defp get_next_state( fp ) do
case IO.read( fp, :line ) do
data when is_binary(data) ->
[rs,cs,vs] = String.split( data )
r = String.to_integer(rs)
c = String.to_integer(cs)
v = String.to_float(vs)
{r,c,v}
_ ->
File.close( fp )
:fail
end
end
defp loop(fp, r,c,v, oR,vR) do
# Maintains state of Matrix Cell, row, col, value
# receives msgs and responds
receive do
:start ->
send vR, { :multiply, c, self() } # get values for operands via router vR
loop(fp, r,c,v, oR,vR)
{ :multiply, w } -> # handle request to multiply by w and relay to router oR
send oR, { :sum, r, v*w }
case get_next_state( fp ) do # read line from file and fill in rcv
{r1,c1,v1} ->
send vR, { :multiply, c1, self() }
loop(fp, r1,c1,v1, oR,vR)
_ -> ## error or end of file etc
##IO.puts(":kill rcv: #{r},#{c},#{v}")
Process.exit( self(), :kill )
end
end
end
# Launch each matrix cell using iteration by tail recursion
def launch(_fp, _oR,_vR, result, 0) do
result |> Enum.reverse # reverse is cosmetic, not substantive
end
def launch(fp, oR,vR, result, count) do
#IO.inspect count
case get_next_state( fp ) do
{r,c,v} ->
pid = spawn fn -> loop( fp, r,c,v, oR,vR) end
launch( fp, oR,vR, [pid|result], count-1 )
_ -> ## error or end of file etc, skip to count 0
launch( fp, oR,vR, result, 0 )
end
end
end
enjoy!
As a side note, it is a common task to create a stream from a file. This has been taken care of, so you can simply use File.stream!/3
to create the stream, no need to use Stream.resource/3
directly.
Regarding your original question: yes you are correct, Stream.chunk_every/2
is the way to go here. It will lazily split the stream into chunks of the supplied size:
File.stream!("./data/fidap011.mtx") |> Stream.chunk_every(5) |> Enum.each(fn chunk ->
# do something with chunk
end)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With