Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Scala functional way of processing large scala data with lazy collections

I am trying to figure out memory-efficient AND functional ways to process a large scale of data using strings in scala. I have read many things about lazy collections and have seen quite a bit of code examples. However, I run into "GC overhead exceeded" or "Java heap space" issues again and again.

Often the problem is that I try to construct a lazy collection, but evaluate each new element when I append it to the growing collection (I don't now any other way to do so incrementally). Of course, I could try something like initializing an initial lazy collection first and and yield the collection holding the desired values by applying the ressource-critical computations with map or so, but often I just simply do not know the exact size of the final collection a priori to initial that lazy collection.

Maybe you could help me by giving me hints or explanations on how to improve following code as an example, which splits a FASTA (definition below) formatted file into two separate files according to the rule that odd sequence pairs belong to one file and even ones to aother one ("separation of strands"). The "most" straight-forward way to do so would be in a imperative way by looping through the lines and printing into the corresponding files via open file streams (and this of course works excellent). However, I just don't enjoy the style of reassigning to variables holding header and sequences, thus the following example code uses (tail-)recursion, and I would appreciate to have found a way to maintain a similar design without running into ressource problems!

The example works perfectly for small files, but already with files at around ~500mb the code will fail with the standard JVM setups. I do want to process files of "arbitray" size, say 10-20gb or so.

val fileName = args(0)
val in = io.Source.fromFile(fileName) getLines

type itType = Iterator[String]
type sType = Stream[(String, String)]

def getFullSeqs(ite: itType) = {
    //val metaChar = ">"
    val HeadPatt = "(^>)(.+)" r
    val SeqPatt  = "([\\w\\W]+)" r
    @annotation.tailrec
    def rec(it: itType, out: sType = Stream[(String, String)]()): sType = 
        if (it hasNext) it next match  {
            case HeadPatt(_,header) =>
                // introduce new header-sequence pair
                rec(it, (header, "") #:: out)
            case SeqPatt(seq) =>
                val oldVal = out head
                // concat subsequences
                val newStream = (oldVal._1, oldVal._2 + seq) #:: out.tail    
                rec(it, newStream)
            case _ =>
                println("something went wrong my friend, oh oh oh!"); Stream[(String, String)]()                
        } else out
    rec(ite)    
}

def printStrands(seqs: sType) {
   import java.io.PrintWriter
   import java.io.File
   def printStrand(seqse: sType, strand: Int) {
        // only use sequences of one strand 
        val indices =  List.tabulate(seqs.size/2)(_*2 + strand - 1).view
        val p = new PrintWriter(new File(fileName + "." + strand))
        indices foreach { i =>
              p.print(">" + seqse(i)._1 + "\n" + seqse(i)._2 + "\n")
        }; p.close
       println("Done bro!")
   }
   List(1,2).par foreach (s => printStrand(seqs, s))
}

printStrands(getFullSeqs(in))

Three questions arise for me:

A) Let's assume one needs to maintain a large data structure obtained by processing the initial iterator you get from getLines like in my getFullSeqs method (note the different size of in and the output of getFullSeqs), because transformations on the whole(!) data is required repeatedly, because one does not know which part of the data one will require at any step. My example might not be the best, but how to do so? Is it possible at all??

B) What when the desired data structure is not inherently lazy, say one would like to store the (header -> sequence) pairs into a Map()? Would you wrap it in a lazy collection?

C) My implementation of constructing the stream might reverse the order of the inputted lines. When calling reverse, all elements will be evaluated (in my code, they already are, so this is the actual problem). Is there any way to post-process "from behind" in a lazy fashion? I know of reverseIterator, but is this already the solution, or will this not actually evaluate all elements first, too (as I would need to call it on a list)? One could construct the stream with newVal #:: rec(...), but I would lose tail-recursion then, wouldn't I?

So what I basically need is to add elements to a collection, which are not evaluated by the process of adding. So lazy val elem = "test"; elem :: lazyCollection is not what I am looking for.

EDIT: I have also tried using by-name parameter for the stream argument in rec .

Thank you so much for your attention and time, I really appreciate any help (again :) ).

/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

FASTA is defined as a sequential set of sequences delimited by a single header line. A header is defined as a line starting with ">". Every line below the header is called part of the sequence associated with the header. A sequence ends when a new header is present. Every header is unique. Example:

>HEADER1
abcdefg
>HEADER2
hijklmn
opqrstu
>HEADER3
vwxyz
>HEADER4
zyxwv

Thus, sequence 2 is twice as big as seq 1. My program would split that file into a file A containing

>HEADER1
abcdefg
>HEADER3
vwxyz

and a second file B containing

>HEADER2
hijklmn
opqrstu
>HEADER4
zyxwv

The input file is assumed to consist of an even number of header-sequence pairs.

like image 306
Wayne Jhukie Avatar asked Feb 21 '23 06:02

Wayne Jhukie


1 Answers

The key to working with really large data structures is to hold in memory only that which is critical to perform whatever operation you need. So, in your case, that's

  • Your input file
  • Your two output files
  • The current line of text

and that's it. In some cases you can need to store information such as how long a sequence is; in such events, you build the data structures in a first pass and use them on a second pass. Let's suppose, for example, that you decide that you want to write three files: one for even records, one for odd, and one for entries where the total length is less than 300 nucleotides. You would do something like this (warning--it compiles but I never ran it, so it may not actually work):

final def findSizes(
  data: Iterator[String], sz: Map[String,Long] = Map(),
  currentName: String = "", currentSize: Long = 0
): Map[String,Long] = {
  def currentMap = if (currentName != "") sz + (currentName->currentSize) else sz
  if (!data.hasNext) currentMap
  else {
    val s = data.next
    if (s(0) == '>') findSizes(data, currentMap, s, 0)
    else findSizes(data, sz, currentName, currentSize + s.length)
  }
}

Then, for processing, you use that map and pass through again:

import java.io._
final def writeFiles(
  source: Iterator[String], targets: Array[PrintWriter],
  sizes: Map[String,Long], count: Int = -1, which: Int = 0
) {
  if (!source.hasNext) targets.foreach(_.close)
  else {
    val s = source.next
    if (s(0) == '>') {
      val w = if (sizes.get(s).exists(_ < 300)) 2 else (count+1)%2
      targets(w).println(s)
      writeFiles(source, targets, sizes, count+1, w)
    }
    else {
      targets(which).println(s)
      writeFiles(source, targets, sizes, count, which)
    }
  }
}

You then use Source.fromFile(f).getLines() twice to create your iterators, and you're all set. Edit: in some sense this is the key step, because this is your "lazy" collection. However, it's not important just because it doesn't read all memory in immediately ("lazy"), but because it doesn't store any previous strings either!

More generally, Scala can't help you that much from thinking carefully about what information you need to have in memory and what you can fetch off disk as needed. Lazy evaluation can sometimes help, but there's no magic formula because you can easily express the requirement to have all your data in memory in a lazy way. Scala can't interpret your commands to access memory as, secretly, instructions to fetch stuff off the disk instead. (Well, not unless you write a library to cache results from disk which does exactly that.)

like image 128
Rex Kerr Avatar answered Feb 26 '23 22:02

Rex Kerr