Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Elixir: can I use Stream.resource to progressively read a large data file?

Tags:

elixir

I know how to use Stream.resource() to take the first 5 lines from a file and place them in a list.

str = Stream.resource(fn -> File.open!("./data/fidap011.mtx") end,
                fn file ->
                  case IO.read(file, :line) do
                    data when is_binary(data) -> {[data], file}
                    _ -> {:halt, file}
                  end
                end,
                fn file -> File.close(file) end)
str |>  Enum.take(5)

But how do I then take, say, the next 5 lines from the same stream? If I again type:

str |>  Enum.take(5)

I just get the same first 5 lines.

Am I missing something obvious here?

Eventually, I wish to read enough data from my stream to spawn some processes that process that data. When some of those processes complete I wish then to read more from the same stream and thus process the next set of data, etc. Should Stream.chunk() come into play here? But without an example I don't seem to be able to intuit how.

EDIT - several design iterations later!

For my purposes it is easier not to use Stream. Instead I simple create a file pointer/process using

{:ok, fp} = File.open( "data/fidap011.mtx" )

then I actually pass that fp to 30000 different spawned processes and they have no difficulty in reading from it when ever they like. Each of those processes changes its state by reading its new state variables from the file. In the module below oR and vR are two "router" processes that receive message - the code is part of a sparse matrix /vector multiplier.

defmodule M_Cells do
 @moduledoc """
 Provides matrix related code
 Each cell process serves for that row & col
 """

 defp get_next_state( fp ) do
    case IO.read( fp, :line ) do
        data when is_binary(data) ->
            [rs,cs,vs] = String.split( data )
            r = String.to_integer(rs)
            c = String.to_integer(cs)
            v = String.to_float(vs)
            {r,c,v}
        _ -> 
            File.close( fp )
            :fail
    end
 end


 defp loop(fp, r,c,v, oR,vR) do
  # Maintains state of Matrix Cell, row, col, value 
  # receives msgs and responds
   receive do

    :start  ->  
        send vR, { :multiply, c, self() }  # get values for operands via router vR
        loop(fp, r,c,v, oR,vR)

    { :multiply, w } ->  # handle request to multiply by w and relay to router oR
        send oR, { :sum, r, v*w }
        case get_next_state( fp ) do # read line from file and fill in rcv
            {r1,c1,v1} ->
                send vR, { :multiply, c1, self() }
                loop(fp, r1,c1,v1, oR,vR)
            _ -> ## error or end of file etc
              ##IO.puts(":kill rcv: #{r},#{c},#{v}")
              Process.exit( self(), :kill )
        end
   end
 end

 # Launch each matrix cell using iteration by tail recursion
 def launch(_fp, _oR,_vR, result, 0) do
   result |> Enum.reverse # reverse is cosmetic, not substantive
 end

 def launch(fp, oR,vR, result, count) do
    #IO.inspect count
    case get_next_state( fp ) do
        {r,c,v} ->
            pid = spawn fn -> loop( fp, r,c,v, oR,vR) end
            launch( fp, oR,vR, [pid|result], count-1 )

        _ -> ## error or end of file etc, skip to count 0
            launch( fp, oR,vR, result, 0 )
    end
 end

end

enjoy!

like image 260
GavinBrelstaff Avatar asked Jan 05 '15 14:01

GavinBrelstaff


1 Answers

As a side note, it is a common task to create a stream from a file. This has been taken care of, so you can simply use File.stream!/3 to create the stream, no need to use Stream.resource/3 directly.

Regarding your original question: yes you are correct, Stream.chunk_every/2 is the way to go here. It will lazily split the stream into chunks of the supplied size:

File.stream!("./data/fidap011.mtx") |> Stream.chunk_every(5) |> Enum.each(fn chunk ->
  # do something with chunk
end)
like image 70
Patrick Oscity Avatar answered Sep 21 '22 14:09

Patrick Oscity