Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can only do 4 concurrent futures as maximum in Scala

I thought that using futures would easily allow me to to fire off one shot code blocks, however it seems I can only have 4 futures at a time.

Where does this restriction come from, or am I abusing Futures by using it like this?

import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import java.util.Calendar

object Main extends App{

  val rand = scala.util.Random

  for (x <- 1 to 100) {
    val f = Future {
      //val sleepTime =  rand.nextInt(1000)
      val sleepTime =  2000
      Thread.sleep(sleepTime)

      val today = Calendar.getInstance().getTime()
      println("Future: " + x + " - sleep was: " + sleepTime + " - " + today)
      1;
    }
  }

  Thread.sleep(10000)
}

Output:

Future: 3 - sleep was: 2000 - Mon Aug 31 10:02:44 CEST 2015
Future: 2 - sleep was: 2000 - Mon Aug 31 10:02:44 CEST 2015
Future: 4 - sleep was: 2000 - Mon Aug 31 10:02:44 CEST 2015
Future: 1 - sleep was: 2000 - Mon Aug 31 10:02:44 CEST 2015
Future: 7 - sleep was: 2000 - Mon Aug 31 10:02:46 CEST 2015
Future: 5 - sleep was: 2000 - Mon Aug 31 10:02:46 CEST 2015
Future: 6 - sleep was: 2000 - Mon Aug 31 10:02:46 CEST 2015
Future: 8 - sleep was: 2000 - Mon Aug 31 10:02:46 CEST 2015
Future: 9 - sleep was: 2000 - Mon Aug 31 10:02:48 CEST 2015
Future: 11 - sleep was: 2000 - Mon Aug 31 10:02:48 CEST 2015
Future: 10 - sleep was: 2000 - Mon Aug 31 10:02:48 CEST 2015
Future: 12 - sleep was: 2000 - Mon Aug 31 10:02:48 CEST 2015
Future: 16 - sleep was: 2000 - Mon Aug 31 10:02:50 CEST 2015
Future: 13 - sleep was: 2000 - Mon Aug 31 10:02:50 CEST 2015
Future: 15 - sleep was: 2000 - Mon Aug 31 10:02:50 CEST 2015
Future: 14 - sleep was: 2000 - Mon Aug 31 10:02:50 CEST 2015

I expected them to all show the same time.

To give some context, I thought I could use this construct and extend it by having a main loop, in which it sleeps every loop according to a value drawn from a exponential disitribution , to emulate user arrival/execution of a query. After each sleep I'd like to execute the query by sending it to the program's driver (in this case Spark, and the driver allows for multiple threads using it.) Is there a more obvious way than to use Futures?

like image 432
hbogert Avatar asked Aug 31 '15 08:08

hbogert


People also ask

Does Scala support concurrency?

Scala concurrency is built on top of the Java concurrency model. On Sun JVMs, with a IO-heavy workload, we can run tens of thousands of threads on a single machine. A Thread takes a Runnable. You have to call start on a Thread in order for it to run the Runnable.

Which method is used to execute the future in Scala?

The simplest way to create a future object is to invoke the Future. apply method which starts an asynchronous computation and returns a future holding the result of that computation. The result becomes available once the future completes.

How do Scala futures work?

Future represents a result of an asynchronous computation that may or may not be available yet. When we create a new Future, Scala spawns a new thread and executes its code. Once the execution is finished, the result of the computation (value or exception) will be assigned to the Future.

What is the difference between a Java Future and a Scala Future?

A Java Future works in a synchronous blocking way. It does not work in an asynchronous non-blocking way, whereas a Scala Future works in an asynchronous non-blocking way. If we want an asynchronous non-blocking feature, we should use Java 8's CompletableFuture.


2 Answers

When you are using using import ExecutionContext.Implicits.global, It creates thread pool which has the same size of the number of CPUs.

From the source of the ExecutionContext.scala

The default ExecutionContext implementation is backed by a work-stealing thread pool. By default, the thread pool uses a target number of worker threads equal to the number of [[https://docs.oracle.com/javase/8/docs/api/java/lang/Runtime.html#availableProcessors-- available processors]].

And there's good StackOverflow question: What is the behavior of scala.concurrent.ExecutionContext.Implicits.global?

Since the default size of the thread pool depends on number of CPUs, if you want to use larger thread pool, you have to write something like

import scala.concurrent.ExecutionContext
import java.util.concurrent.Executors
implicit val ec = ExecutionContext.fromExecutorService(Executors.newWorkStealingPool(8))

before executing the Future.

( In your code, you have to place it before for loop. )

Note that work stealing pool was added in java 8, scala has their own ForkJoinPool which does the work stealing: scala.concurrent.forkjoin.ForkJoinPool vs java.util.concurrent.ForkJoinPool

Also if you want one thread per Future, you can write something like

implicit val ec = ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor)

Therefore, the following code executes 100 threads in parallel

import scala.concurrent._
import java.util.concurrent.Executors

object Main extends App{
  for (x <- 1 to 100) {
    implicit val ec = ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor)
    val f = Future {
      val sleepTime =  2000
      Thread.sleep(sleepTime)

      val today = Calendar.getInstance().getTime()
      println("Future: " + x + " - sleep was: " + sleepTime + " - " + today)
      1;
    }
  }

  Thread.sleep(10000)
}

In addition to work stealing thread pool and single thread executors, there's some other executors: http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html

Read the docs for detail: http://docs.scala-lang.org/overviews/core/futures.html

like image 71
ymonad Avatar answered Sep 26 '22 22:09

ymonad


The default pool when using import scala.concurrent.ExecutionContext.Implicits.global indeed has as many threads as you have cores on your machine. This is ideal for non-blocking code (no synchronous io/sleep/...) but can be problematic and even cause deadlocks when you use it for blocking code.

However, this pool can actually grow if you mark blocking code in a scala.concurrent.blocking block. The same marker is for example in use when you are using Await.result and Await.ready functions that block while waiting for a Future.

see the api docs for blocking

So all you have to do is update your example:

import scala.concurrent.blocking
...
val sleepTime = 2000
blocking{
  Thread.sleep(sleepTime)
}
...

Now all futures will end after 2000 ms

like image 33
Somatik Avatar answered Sep 24 '22 22:09

Somatik