I can run scala's foreach in parallel like that:
val N = 100
(0 until N).par.foreach(i => {
// do something
})
But how can I set thread number? I want something like that:
val N = 100
val NThreads = 5
(0 until N).par.foreach(NThreads, i => {
// do something
})
The design of Scala's parallel collections library is inspired by and deeply integrated with Scala's (sequential) collections library (introduced in 2.8). It provides a parallel counterpart to a number of important data structures from Scala's (sequential) collection library, including: ParArray. ParVector. mutable.
Parallel computing is a type of computation where different computations can be performed at the same time. Basic principle: The problem can be divided into subproblems, each of which can be solved simultaneously.
Every parallel collection keeps a tasksupport
object which keeps a reference to thread pool implementation.
So, you can set the parallelism level through that object by changing the reference of tasksupport
object to a new thread pool according to your need. eg:
def f(numOfThread: Int, n: Int) = {
import scala.collection.parallel._
val coll = (0 to n).par
coll.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(numOfThreads))
coll.foreach(i => {
// do something
})
}
f(2, 100)
For more info on configuring parallel collections you can refer http://docs.scala-lang.org/overviews/parallel-collections/configuration.html
Official Scala documentation provides a way to change the task support of a parallel collection like this:
import scala.collection.parallel._
val pc = mutable.ParArray(1, 2, 3)
pc.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(2))
Also it is mentioned that
The execution context task support is set to each parallel collection by default, so parallel collections reuse the same fork-join pool as the future API.
It means that you should create single pool and reuse it. This approach causes resource leak:
def calculate(collection: Seq[Int]): Seq[Int] = {
val parallel = collection.par
parallel.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(5))
parallel.map(_ * 2).seq
}
Right way to do this would be to reuse existing pool:
val taskSupport = new ForkJoinTaskSupport(new ForkJoinPool(5))
def calculate(collection: Seq[Int]): Seq[Int] = {
val parallel = collection.par
parallel.tasksupport = taskSupport
parallel.map(_ * 2).seq
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With