Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I set the default number of threads for Scala 2.10 parallel collections?

In Scala before 2.10, I can set the parallelism in the defaultForkJoinPool (as in this answer scala parallel collections degree of parallelism). In Scala 2.10, that API no longer exists. It is well documented that we can set the parallelism on a single collection (http://docs.scala-lang.org/overviews/parallel-collections/configuration.html) by assigning to its taskSupport property.

However, I use parallel collections all over my codebase and would not like to add an extra two lines to every single collection instantiation. Is there some way to configure the global default thread pool size so that someCollection.par.map(f(_)) automatically uses the default number of threads?

like image 901
JZeta Avatar asked Jul 25 '13 18:07

JZeta


1 Answers

I know that the question is over a month old, but I've just had exactly the same question. Googling wasn't helpful and I couldn't find anything that looked halfway sane in the new API.

Setting -Dscala.concurrent.context.maxThreads=n as suggested here: Set the parallelism level for all collections in Scala 2.10? seemingly had no effect at all, but I'm not sure if I used it correctly (I run my application with 'java' in an environment without 'scala' installed explicitly, it might be the cause).

I don't know why scala-people removed this essential setter from the appropriate package object.

However, it's often possible to use reflection to work around an incomplete/weird interface:

def setParallelismGlobally(numThreads: Int): Unit = {
  val parPkgObj = scala.collection.parallel.`package`
  val defaultTaskSupportField = parPkgObj.getClass.getDeclaredFields.find{
    _.getName == "defaultTaskSupport"
  }.get

  defaultTaskSupportField.setAccessible(true)
  defaultTaskSupportField.set(
    parPkgObj, 
    new scala.collection.parallel.ForkJoinTaskSupport(
      new scala.concurrent.forkjoin.ForkJoinPool(numThreads)
    ) 
  )
}

For those not familiar with the more obscure features of Scala, here is a short explanation:

scala.collection.parallel.`package`

accesses the package object with the defaultTaskSupport variable (it looks somewhat like Java's static variable, but it's actually a member variable of the package object). The backticks are required for the identifier, because package is a reserved keyword. Then we get the private final field that we want (getField("defaultTaskSupport") didn't work for some reason?...), tell it to be accessible in order to be able to modify it, and then replace it's value by our own ForkJoinTaskSupport.

I don't yet understand the exact mechanism of the creation of parallel collections, but the source code of the Combiner trait suggests that the value of defaultTaskSupport should percolate to the parallel collections somehow.

Notice that the question is qualitatively of the same sort as a much older question: "I have Math.random() all over my codebase, how can I set the seed to a fixed number for debugging purposes?" (See e.g. : Set seed on Math.random() ). In both cases, we have some sort of global "static" variable that we implicitly use in a million different places, we want to change it, but there are no setters for this variable => we use reflection.

Ugly as hell, but seems to work just fine. If you need to limit the total number of threads, don't forget that the garbage collector runs on separate thread.

like image 148
Andrey Tyukin Avatar answered Nov 01 '22 13:11

Andrey Tyukin