I thought that using futures would easily allow me to to fire off one shot code blocks, however it seems I can only have 4 futures at a time.
Where does this restriction come from, or am I abusing Futures by using it like this?
import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import java.util.Calendar
object Main extends App{
val rand = scala.util.Random
for (x <- 1 to 100) {
val f = Future {
//val sleepTime = rand.nextInt(1000)
val sleepTime = 2000
Thread.sleep(sleepTime)
val today = Calendar.getInstance().getTime()
println("Future: " + x + " - sleep was: " + sleepTime + " - " + today)
1;
}
}
Thread.sleep(10000)
}
Output:
Future: 3 - sleep was: 2000 - Mon Aug 31 10:02:44 CEST 2015
Future: 2 - sleep was: 2000 - Mon Aug 31 10:02:44 CEST 2015
Future: 4 - sleep was: 2000 - Mon Aug 31 10:02:44 CEST 2015
Future: 1 - sleep was: 2000 - Mon Aug 31 10:02:44 CEST 2015
Future: 7 - sleep was: 2000 - Mon Aug 31 10:02:46 CEST 2015
Future: 5 - sleep was: 2000 - Mon Aug 31 10:02:46 CEST 2015
Future: 6 - sleep was: 2000 - Mon Aug 31 10:02:46 CEST 2015
Future: 8 - sleep was: 2000 - Mon Aug 31 10:02:46 CEST 2015
Future: 9 - sleep was: 2000 - Mon Aug 31 10:02:48 CEST 2015
Future: 11 - sleep was: 2000 - Mon Aug 31 10:02:48 CEST 2015
Future: 10 - sleep was: 2000 - Mon Aug 31 10:02:48 CEST 2015
Future: 12 - sleep was: 2000 - Mon Aug 31 10:02:48 CEST 2015
Future: 16 - sleep was: 2000 - Mon Aug 31 10:02:50 CEST 2015
Future: 13 - sleep was: 2000 - Mon Aug 31 10:02:50 CEST 2015
Future: 15 - sleep was: 2000 - Mon Aug 31 10:02:50 CEST 2015
Future: 14 - sleep was: 2000 - Mon Aug 31 10:02:50 CEST 2015
I expected them to all show the same time.
To give some context, I thought I could use this construct and extend it by having a main loop, in which it sleeps every loop according to a value drawn from a exponential disitribution , to emulate user arrival/execution of a query. After each sleep I'd like to execute the query by sending it to the program's driver (in this case Spark, and the driver allows for multiple threads using it.) Is there a more obvious way than to use Futures?
Scala concurrency is built on top of the Java concurrency model. On Sun JVMs, with a IO-heavy workload, we can run tens of thousands of threads on a single machine. A Thread takes a Runnable. You have to call start on a Thread in order for it to run the Runnable.
The simplest way to create a future object is to invoke the Future. apply method which starts an asynchronous computation and returns a future holding the result of that computation. The result becomes available once the future completes.
Future represents a result of an asynchronous computation that may or may not be available yet. When we create a new Future, Scala spawns a new thread and executes its code. Once the execution is finished, the result of the computation (value or exception) will be assigned to the Future.
A Java Future works in a synchronous blocking way. It does not work in an asynchronous non-blocking way, whereas a Scala Future works in an asynchronous non-blocking way. If we want an asynchronous non-blocking feature, we should use Java 8's CompletableFuture.
When you are using using import ExecutionContext.Implicits.global
,
It creates thread pool which has the same size of the number of CPUs.
From the source of the ExecutionContext.scala
The default
ExecutionContext
implementation is backed by a work-stealing thread pool. By default, the thread pool uses a target number of worker threads equal to the number of [[https://docs.oracle.com/javase/8/docs/api/java/lang/Runtime.html#availableProcessors-- available processors]].
And there's good StackOverflow question: What is the behavior of scala.concurrent.ExecutionContext.Implicits.global?
Since the default size of the thread pool depends on number of CPUs, if you want to use larger thread pool, you have to write something like
import scala.concurrent.ExecutionContext
import java.util.concurrent.Executors
implicit val ec = ExecutionContext.fromExecutorService(Executors.newWorkStealingPool(8))
before executing the Future
.
( In your code, you have to place it before for
loop. )
Note that work stealing pool was added in java 8, scala has their own ForkJoinPool
which does the work stealing: scala.concurrent.forkjoin.ForkJoinPool vs java.util.concurrent.ForkJoinPool
Also if you want one thread per Future
, you can write something like
implicit val ec = ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor)
Therefore, the following code executes 100 threads in parallel
import scala.concurrent._
import java.util.concurrent.Executors
object Main extends App{
for (x <- 1 to 100) {
implicit val ec = ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor)
val f = Future {
val sleepTime = 2000
Thread.sleep(sleepTime)
val today = Calendar.getInstance().getTime()
println("Future: " + x + " - sleep was: " + sleepTime + " - " + today)
1;
}
}
Thread.sleep(10000)
}
In addition to work stealing thread pool and single thread executors, there's some other executors: http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html
Read the docs for detail: http://docs.scala-lang.org/overviews/core/futures.html
The default pool when using import scala.concurrent.ExecutionContext.Implicits.global
indeed has as many threads as you have cores on your machine. This is ideal for non-blocking code (no synchronous io/sleep/...) but can be problematic and even cause deadlocks when you use it for blocking code.
However, this pool can actually grow if you mark blocking code in a scala.concurrent.blocking
block. The same marker is for example in use when you are using Await.result
and Await.ready
functions that block while waiting for a Future.
see the api docs for blocking
So all you have to do is update your example:
import scala.concurrent.blocking
...
val sleepTime = 2000
blocking{
Thread.sleep(sleepTime)
}
...
Now all futures will end after 2000 ms
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With