Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the best way to create map in parallel with scala?

Suppose I have a collection which should be converted to Map, but not in one to one fashion like map method.

var map = collection.mutable.HashMap()
for (p <- dataList.par) {
  if(cond1(p)) {
    map += (p, true)
  } else {
    // do nothing
  }
}

I've come up with several solutions and want to know what is best.

  1. map.synchronize { map += (p, true) }

  2. use actor to update map. But I dont know how to wait till all actors task are completed

  3. yield Some(p) or None and then run foreach { case Some(p) => map += (p, true)}. But I don't know how to make it sequential if first iterator is from parallel collections.
like image 654
yura Avatar asked Dec 17 '22 08:12

yura


2 Answers

Not sure that will actually perform best, but that should make evaluation of conditions parallel:

import scala.collection._
val map: mutable.Map[Int, Boolean] 
  = dataList.par.collect{case p if cond1(p) => (p, true)}(breakOut)

(with a mutable Map as it is what your code did, but this is not required).

Context must give the type of the expected result (hence the : mutable.Map[Int, Boolean]) for breakOut to work.

Edit: breakOut is scala.collection.breakOut. Collections operation returning a collection (here collect) takes an implicit argument bf: CanBuildFrom[SourceCollectionType, ElementType, ResultType]. Implicit CanBuildFroms made available by the library are arranged so that the best possible ResultType will be returned, and best means closest to the source collection type. breakOut is passed in place of this implicit argument, so that another CanBuildFrom, hence result type, can be selected. What breakOut does is select the CanBuildFrom irrespective of the source collection type. But then there are many implicits available and no priority rule. That is why the result type must be given by the context, so that one of the implicits can be selected.

To sum up, when breakOut is passed in place of the implicit argument, the result will be built to the type expected in the context.

like image 127
Didier Dupont Avatar answered Jan 19 '23 01:01

Didier Dupont


The breakOut mentioned in the other answer resolves to a builder factory for the collection of the expected type of map. The expected type of map is mutable.Map[Int, Boolean].

Since the builder factory is provided by a sequential collection, the collect will not proceed in parallel:

scala> val cond1: Int => Boolean = _ % 2 == 0
cond1: Int => Boolean = <function1>

scala> val dataList = 1 to 10
dataList: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> val map: mutable.Map[Int,Boolean] = dataList.par.collect{case p if cond1(p) => println(Thread.currentThread); (p, true)}(breakOut)
Thread[Thread-8,5,main]
Thread[Thread-8,5,main]
Thread[Thread-8,5,main]
Thread[Thread-8,5,main]
Thread[Thread-8,5,main]
map: scala.collection.mutable.Map[Int,Boolean] = Map(10 -> true, 8 -> true, 4 -> true, 6 -> true, 2 -> true)

You can see that from the thread name - the thread should contain a name ForkJoin-something.

The Correct Way

The correct way to do it should be to first use the breakOut with the expected type being a parallel map, so that the collect proceeds in parallel:

scala> val map: parallel.mutable.ParMap[Int,Boolean] = dataList.par.collect{case p if cond1(p) => println(Thread.currentThread);(p, true)}(breakOut)
Thread[Thread-9,5,main]
Thread[Thread-9,5,main]
Thread[Thread-9,5,main]
Thread[Thread-9,5,main]
Thread[Thread-9,5,main]
map: scala.collection.parallel.mutable.ParMap[Int,Boolean] = ParHashMap(10 -> true, 8 -> true, 4 -> true, 6 -> true, 2 -> true)

and then call seq on the result of collect, since seq is always O(1).

UPDATE: just checked - this seems to work correctly with trunk, but not with 2.9.1.final.

The Patch

But, as you can see, this doesn't work either because it is a bug, and will be fixed in the next version of Scala. A workaround:

scala> val map: parallel.mutable.ParMap[Int, Boolean] = dataList.par.collect{case p if cond1(p) => println(Thread.currentThread);(p, true)}.map(x => x)(breakOut)
Thread[ForkJoinPool-1-worker-7,5,main]
Thread[ForkJoinPool-1-worker-3,5,main]
Thread[ForkJoinPool-1-worker-0,5,main]
Thread[ForkJoinPool-1-worker-8,5,main]
Thread[ForkJoinPool-1-worker-1,5,main]
map: scala.collection.parallel.mutable.ParMap[Int,Boolean] = ParHashMap(10 -> true, 8 -> true, 4 -> true, 6 -> true, 2 -> true)

scala> val sqmap = map.seq
sqmap: scala.collection.mutable.Map[Int,Boolean] = Map(10 -> true, 8 -> true, 4 -> true, 6 -> true, 2 -> true)

With a note that the final map will currently be done sequentially.

Alternatively, if just a parallel.ParMap is ok with you, you can do:

scala> val map: Map[Int, Boolean] = dataList.par.collect{case p if cond1(p) => println(Thread.currentThread);(p, true)}.toMap.seq
Thread[ForkJoinPool-1-worker-2,5,main]
Thread[ForkJoinPool-1-worker-3,5,main]
Thread[ForkJoinPool-1-worker-7,5,main]
Thread[ForkJoinPool-1-worker-1,5,main]
Thread[ForkJoinPool-1-worker-8,5,main]
map: scala.collection.Map[Int,Boolean] = Map(10 -> true, 6 -> true, 2 -> true, 8 -> true, 4 -> true)
like image 29
axel22 Avatar answered Jan 18 '23 23:01

axel22