Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallel collection processing of data larger than memory size

Is there a simple way to use scala parallel collections without loading a full collection into memory?

For example I have a large collection and I'd like to perform a particular operation (fold) in parallel only on a small chunk, that fits into memory, than on another chunk and so on, and finally recombine results from all chunks.

I know, that actors could be used, but it would be really nice to use par-collections.

I've written a solution, but it isn't nice:

  def split[A](list: Iterable[A], chunkSize: Int): Iterable[Iterable[A]] = {
    new Iterator[Iterable[A]] {
      var rest = list
      def hasNext = !rest.isEmpty
      def next = {
        val chunk = rest.take(chunkSize)
        rest = rest.drop(chunkSize)
        chunk
      }
    }.toIterable
  }                                               

  def foldPar[A](acc: A)(list: Iterable[A], chunkSize: Int, combine: ((A, A) => A)): A = {
    val chunks: Iterable[Iterable[A]] = split(list, chunkSize)
    def combineChunk: ((A,Iterable[A]) => A) = { case (res, entries) => entries.par.fold(res)(combine) }
    chunks.foldLeft(acc)(combineChunk)
  }                                               

  val chunkSize = 10000000                        
    val x = 1 to chunkSize*10                 

    def sum: ((Int,Int) => Int) = {case (acc,n) => acc + n }

    foldPar(0)(x,chunkSize,sum)
like image 381
Mikhail Golubtsov Avatar asked Jun 30 '13 19:06

Mikhail Golubtsov


1 Answers

Your idea is very neat and it's a pity there is no such function available already (AFAIK).

I just rephrased your idea into a bit shorter code. First, I feel that for parallel folding it's useful to use the concept of monoid - it's a structure with an associative operation and a zero element. The associativity is important because we don't know the order in which we combine result that are computed in parallel. And the zero element is important so that we can split computations into blocks and start folding each one from the zero. There is nothing new about it though, it's just what fold for Scala's collections expects.

// The function defined by Monoid's apply must be associative
// and zero its identity element.
trait Monoid[A]
  extends Function2[A,A,A]
{
  val zero: A
}

Next, Scala's Iterators already have a useful method grouped(Int): GroupedIterator[Seq[A]] which slices the iterator into fixed-size sequences. It's quite similar to your split. This allows us to cut the input into fixed-size blocks and then apply Scala's parallel collection methods on them:

def parFold[A](c: Iterator[A], blockSize: Int)(implicit monoid: Monoid[A]): A =
  c.grouped(blockSize).map(_.par.fold(monoid.zero)(monoid))
                      .fold(monoid.zero)(monoid);

We fold each block using the parallel collections framework and then (without any parallelization) combine the intermediate results.

An example:

// Example:
object SumMonoid extends Monoid[Long] {
  override val zero: Long = 0;
  override def apply(x: Long, y: Long) = x + y;
}
val it = Iterator.range(1, 10000001).map(_.toLong)
println(parFold(it, 100000)(SumMonoid));
like image 182
Petr Avatar answered Nov 04 '22 16:11

Petr