Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Memory consumption of a parallel Scala Stream

I have written a Scala (2.9.1-1) application that needs to process several million rows from a database query. I am converting the ResultSet to a Stream using the technique shown in the answer to one of my previous questions:

class Record(...)

val resultSet = statement.executeQuery(...)

new Iterator[Record] {
  def hasNext = resultSet.next()
  def next = new Record(resultSet.getString(1), resultSet.getInt(2), ...)
}.toStream.foreach { record => ... }

and this has worked very well.

Since the body of the foreach closure is very CPU intensive, and as a testament to the practicality of functional programming, if I add a .par before the foreach, the closures get run in parallel with no other effort, except to make sure that the body of the closure is thread safe (it is written in a functional style with no mutable data except printing to a thread-safe log).

However, I am worried about memory consumption. Is the .par causing the entire result set to load in RAM, or does the parallel operation load only as many rows as it has active threads? I've allocated 4G to the JVM (64-bit with -Xmx4g) but in the future I will be running it on even more rows and worry that I'll eventually get an out-of-memory.

Is there a better pattern for doing this kind of parallel processing in a functional manner? I've been showing this application to my co-workers as an example of the value of functional programming and multi-core machines.

like image 704
Ralph Avatar asked Mar 22 '12 11:03

Ralph


1 Answers

If you look at the scaladoc of Stream, you will notice that the definition class of par is the Parallelizable trait... and, if you look at the source code of this trait, you will notice that it takes each element from the original collection and put them into a combiner, thus, you will load each row into a ParSeq:

  def par: ParRepr = {
    val cb = parCombiner
    for (x <- seq) cb += x
    cb.result
  }

  /** The default `par` implementation uses the combiner provided by this method
   *  to create a new parallel collection.
   *
   *  @return  a combiner for the parallel collection of type `ParRepr`
   */
  protected[this] def parCombiner: Combiner[A, ParRepr]

A possible solution is to explicitly parallelize your computation, thanks to actors for example. You can take a look at this example from the akka documentation for example, that might be helpful in your context.

like image 108
Nicolas Avatar answered Oct 04 '22 20:10

Nicolas