Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to set thread number for the parallel collections?

I can run scala's foreach in parallel like that:

val N = 100
(0 until N).par.foreach(i => {
   // do something
})

But how can I set thread number? I want something like that:

val N = 100
val NThreads = 5
(0 until N).par.foreach(NThreads, i => {
   // do something
})
like image 463
user1312837 Avatar asked Jun 09 '16 12:06

user1312837


People also ask

What is parallel collection?

The design of Scala's parallel collections library is inspired by and deeply integrated with Scala's (sequential) collections library (introduced in 2.8). It provides a parallel counterpart to a number of important data structures from Scala's (sequential) collection library, including: ParArray. ParVector. mutable.

What is Parallel Programming in Scala?

Parallel computing is a type of computation where different computations can be performed at the same time. Basic principle: The problem can be divided into subproblems, each of which can be solved simultaneously.


2 Answers

Every parallel collection keeps a tasksupport object which keeps a reference to thread pool implementation.

So, you can set the parallelism level through that object by changing the reference of tasksupport object to a new thread pool according to your need. eg:

def f(numOfThread: Int, n: Int) = {
 import scala.collection.parallel._
 val coll = (0 to n).par
 coll.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(numOfThreads))
  coll.foreach(i => {
   // do something
  })
}

f(2, 100)

For more info on configuring parallel collections you can refer http://docs.scala-lang.org/overviews/parallel-collections/configuration.html

like image 132
curious Avatar answered Oct 12 '22 09:10

curious


Official Scala documentation provides a way to change the task support of a parallel collection like this:

import scala.collection.parallel._
val pc = mutable.ParArray(1, 2, 3)
pc.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(2))

Also it is mentioned that

The execution context task support is set to each parallel collection by default, so parallel collections reuse the same fork-join pool as the future API.

It means that you should create single pool and reuse it. This approach causes resource leak:

def calculate(collection: Seq[Int]): Seq[Int] = {
  val parallel = collection.par
  parallel.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(5))
  parallel.map(_ * 2).seq
} 

Right way to do this would be to reuse existing pool:

val taskSupport = new ForkJoinTaskSupport(new ForkJoinPool(5))

def calculate(collection: Seq[Int]): Seq[Int] = {
  val parallel = collection.par
  parallel.tasksupport = taskSupport
  parallel.map(_ * 2).seq
}
like image 30
Avseiytsev Dmitriy Avatar answered Oct 12 '22 11:10

Avseiytsev Dmitriy