Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why this Scala code execute two Futures in one thread?

I've been using multiple threads for a long time, yet can not explain such a simple case.

import java.util.concurrent.Executors
import scala.concurrent._
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))

def addOne(x: Int) = Future(x + 1)
def addTwo(x: Int) = Future {addOne(x + 1)}

addTwo(1)
// res5: Future[Future[Int]] = Future(Success(Future(Success(3))))

To my surprise, it works. And I don't know why.

Question:
Why given one thread can it execute two Futures at the same time?

My expectation:
The first Future (addTwo) is occupying the one and only thread (newFixedThreadPool(1)), then it calls another Future (addOne) which again needs another thread.
So the program should end up starved for threads and get stuck.

like image 593
WeiChing 林煒清 Avatar asked Jul 01 '19 02:07

WeiChing 林煒清


People also ask

Is Scala multi threaded?

Scala supports multithreading, which means we can execute multiple threads at once. We can perform multiple operations independently and achieve multitasking. This lets us develop concurrent applications.

How do Scala Futures work?

Future represents a result of an asynchronous computation that may or may not be available yet. When we create a new Future, Scala spawns a new thread and executes its code. Once the execution is finished, the result of the computation (value or exception) will be assigned to the Future.

How do you handle Future Scala?

Handle the result of a future with methods like onComplete , or combinator methods like map , flatMap , filter , andThen , etc. The value in a Future is always an instance of one of the Try types: Success or Failure.

Are Futures threads?

Futures: The newer version of threads Futures are nothing but the wrappers on the concurrent computations that are either going to be successful with a value or fail with an exception. These are just like a placeholder object for a value that may not yet exist. So unlike Threads, Futures have a return type.


2 Answers

The reason that your code is working, is that both futures will be executed by the same thread. The ExecutionContext that you are creating will not use a Thread directly for each Future but will instead schedule tasks (Runnable instances) to be executed. In case no more threads are available in the pool these tasks will be put into a BlockingQueue waiting to be executed. (See ThreadPoolExecutor API for details)

If you look at the implementation of Executors.newFixedThreadPool(1) you'll see that creates an Executor with an unbounded queue:

new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable])

To get the effect of thread-starvation that you were looking for, you could create an executor with a limited queue yourself:

 implicit val ec = ExecutionContext.fromExecutor(new ThreadPoolExecutor(1, 1, 0L, 
                     TimeUnit.MILLISECONDS, new ArrayBlockingQueue[Runnable](1)))

Since the minimal capacity of ArrayBlockingQueue is 1 you would need three futures to reach the limit, and you would also need to add some code to be executed on the result of the future, to keep them from completing (in the example below I do this by adding .map(identity))

The following example

import scala.concurrent._
implicit val ec = ExecutionContext.fromExecutor(new ThreadPoolExecutor(1, 1, 0L, 
                      TimeUnit.MILLISECONDS, new ArrayBlockingQueue[Runnable](1)))

def addOne(x: Int) = Future {
  x + 1
}
def addTwo(x: Int) = Future {
  addOne(x + 1) .map(identity)
}
def addThree(x: Int) = Future {
  addTwo(x + 1).map(identity)
}

println(addThree(1))

fails with

java.util.concurrent.RejectedExecutionException: Task scala.concurrent.impl.CallbackRunnable@65a264b6 rejected from java.util.concurrent.ThreadPoolExecutor@10d078f4[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 1]
like image 85
Harald Gliebe Avatar answered Sep 30 '22 13:09

Harald Gliebe


expand it to Promise is easily to undunderstand

val p1 = Promise[Future[Int]]
ec.execute(() => {
  // the fist task is start run
  val p2 = Promise[Int]
  //the second task is submit , but no run
  ec.execute(() => {
    p2.complete(Success(1))
    println(s"task 2 -> p1:${p1},p2:${p2}")
  })
  //here the p1 is completed, not wait p2.future finish
  p1.complete(Success(p2.future))
  println(s"task 1 -> p1:${p1},p2:${p2}")// you can see the p1 is completed but the p2 have not
  //first task is finish, will run second task
})
val result: Future[Future[Int]] = p1.future

Thread.sleep(1000)
println(result)
like image 34
余杰水 Avatar answered Sep 30 '22 13:09

余杰水