I've been using multiple threads for a long time, yet can not explain such a simple case.
import java.util.concurrent.Executors
import scala.concurrent._
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))
def addOne(x: Int) = Future(x + 1)
def addTwo(x: Int) = Future {addOne(x + 1)}
addTwo(1)
// res5: Future[Future[Int]] = Future(Success(Future(Success(3))))
To my surprise, it works. And I don't know why.
Question:
Why given one thread can it execute two Futures at the same time?
My expectation:
The first Future
(addTwo
) is occupying the one and only thread (newFixedThreadPool(1)
), then it calls another Future
(addOne
) which again needs another thread.
So the program should end up starved for threads and get stuck.
Scala supports multithreading, which means we can execute multiple threads at once. We can perform multiple operations independently and achieve multitasking. This lets us develop concurrent applications.
Future represents a result of an asynchronous computation that may or may not be available yet. When we create a new Future, Scala spawns a new thread and executes its code. Once the execution is finished, the result of the computation (value or exception) will be assigned to the Future.
Handle the result of a future with methods like onComplete , or combinator methods like map , flatMap , filter , andThen , etc. The value in a Future is always an instance of one of the Try types: Success or Failure.
Futures: The newer version of threads Futures are nothing but the wrappers on the concurrent computations that are either going to be successful with a value or fail with an exception. These are just like a placeholder object for a value that may not yet exist. So unlike Threads, Futures have a return type.
The reason that your code is working, is that both futures will be executed by the same thread. The ExecutionContext
that you are creating will not use a Thread
directly for each Future
but will instead schedule tasks (Runnable
instances) to be executed. In case no more threads are available in the pool these tasks will be put into a BlockingQueue
waiting to be executed. (See ThreadPoolExecutor API for details)
If you look at the implementation of Executors.newFixedThreadPool(1)
you'll see that creates an Executor with an unbounded queue:
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable])
To get the effect of thread-starvation that you were looking for, you could create an executor with a limited queue yourself:
implicit val ec = ExecutionContext.fromExecutor(new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue[Runnable](1)))
Since the minimal capacity of ArrayBlockingQueue
is 1 you would need three futures to reach the limit, and you would also need to add some code to be executed on the result of the future, to keep them from completing (in the example below I do this by adding .map(identity)
)
The following example
import scala.concurrent._
implicit val ec = ExecutionContext.fromExecutor(new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue[Runnable](1)))
def addOne(x: Int) = Future {
x + 1
}
def addTwo(x: Int) = Future {
addOne(x + 1) .map(identity)
}
def addThree(x: Int) = Future {
addTwo(x + 1).map(identity)
}
println(addThree(1))
fails with
java.util.concurrent.RejectedExecutionException: Task scala.concurrent.impl.CallbackRunnable@65a264b6 rejected from java.util.concurrent.ThreadPoolExecutor@10d078f4[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 1]
expand it to Promise
is easily to undunderstand
val p1 = Promise[Future[Int]]
ec.execute(() => {
// the fist task is start run
val p2 = Promise[Int]
//the second task is submit , but no run
ec.execute(() => {
p2.complete(Success(1))
println(s"task 2 -> p1:${p1},p2:${p2}")
})
//here the p1 is completed, not wait p2.future finish
p1.complete(Success(p2.future))
println(s"task 1 -> p1:${p1},p2:${p2}")// you can see the p1 is completed but the p2 have not
//first task is finish, will run second task
})
val result: Future[Future[Int]] = p1.future
Thread.sleep(1000)
println(result)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With