I know that I can transform a Seq[Future[T]]
into a Future[Seq[T]]
via
val seqFuture = Future.sequence(seqOfFutures)
seqFuture.map((seqT: Seq[T]) => {...})
My problem now is, that I have 700 futures in that sequence and I want to be able to control how many of them are resolved in parallel as each future will call an internal rest api, and having 700 requests at the same time is like fireing a dos-attack against that server.
I rather only have something like 10 futures being resolved at a time.
How can I achieve that?
Trying pamu's answer I see the error:
[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:44: com.dreamlines.commons.LazyFuture[A] does not take parameters
[error] val batch = Future.sequence(c.map(_()))
[error] ^
[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:28: no type parameters for method sequence: (in: M[scala.concurrent.Future[A]])(implicit cbf: scala.collection.generic.CanBuildFrom[M[scala.concurrent.Future[A]],A,M[A]], implicit executor: scala.concurrent.ExecutionContext)scala.concurrent.Future[M[A]] exist so that it can be applied to arguments (List[Nothing])
[error] --- because ---
[error] argument expression's type is not compatible with formal parameter type;
[error] found : List[Nothing]
[error] required: ?M[scala.concurrent.Future[?A]]
[error] val batch = Future.sequence(c.map(_()))
[error] ^
[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:42: type mismatch;
[error] found : List[Nothing]
[error] required: M[scala.concurrent.Future[A]]
[error] val batch = Future.sequence(c.map(_()))
[error] ^
[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:36: Cannot construct a collection of type M[A] with elements of type A based on a collection of type M[scala.concurrent.Future[A]].
[error] val batch = Future.sequence(c.map(_()))
[error] ^
[error] four errors found
Handle the result of a future with methods like onComplete , or combinator methods like map , flatMap , filter , andThen , etc. The value in a Future is always an instance of one of the Try types: Success or Failure.
sequence takes a list of futures and transforms it into a single future of list in an asynchronous manner. For instance, assume that you have a list of independent jobs to be run simultaneously. In such a case, the list of futures can be composed into a single future of list using Future. sequence.
Scala concurrency is built on top of the Java concurrency model. On Sun JVMs, with a IO-heavy workload, we can run tens of thousands of threads on a single machine. A Thread takes a Runnable. You have to call start on a Thread in order for it to run the Runnable.
NOTE: With Future. onComplete() we are no longer blocking for the result from the Future but instead we will receive a callback for either a Success or a Failure.
Concurrency is of Scala Future
s is controlled by the ExecutionContext
. Note that futures start executing on the context immediately after creation, so the ExecutionContext
of Future.sequence
doesn't really matter. You have to supply the appropriate context when creating the original futures from the sequence.
The default context ExecutionContext.global
(usually imported through import scala.concurrent.ExecutionContext.Implicits.global
) uses as many threads as there are processor cores, but it can also create many additional threads for blocking tasks, that are wrapped in scala.concurrent.blocking
. This is usually the desired behaviour, but it's not suitable for your problem.
Fortunately, you can use ExecutionContext.fromExecutor
method to wrap a Java thread pool. For example:
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
val context = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
val seqOfFutures = Seq.fill(700)(Future { callRestApi() }(context))
val sequenceFuture = Future.sequence(seqOfFutures)(ExecutionContext.global)
The context can also be provided implicitly of course:
implicit val context: ExecutionContext =
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
val seqOfFutures = Seq.fill(700)(Future { callRestApi() })
// This `sequence` uses the same thread pool as the original futures
val sequenceFuture = Future.sequence(seqOfFutures)
Simple foldLeft
can be used to control the number of futures that run concurrently at a time.
First, let's create a case class called LazyFuture
case class LazyFuture[+A](f: Unit => Future[A]) {
def apply() = f()
}
object LazyFuture {
def apply[A](f: => A)(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => Future(f))
def apply[A](f: => Future[A])(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => f)
}
LazyFuture
stops future from running immediately
val list: List[LazyFuture[A]] = ...
list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])){ (r, c) =>
val batch = Future.sequence(c.map(_()))
batch.flatMap(values => r.map(rs => rs ++ values))
}
Change concurFactor
accordingly to run multiple futures concurrently.
concurFactor
of 1 will run one future at once
concurFactor
of 2 will run two futures at once
and so on ...
def executeBatch[A](list: List[LazyFuture[A]])(concurFactor: Int) =
list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])){ (r, c) =>
val batch = Future.sequence(c.map(_()))
r.flatMap(rs => batch.map(values => rs ++ values))
}
case class LazyFuture[+A](f: Unit => Future[A]) {
def apply() = f()
}
object LazyFuture {
def apply[A](f: => A)(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => Future(f))
def apply[A](f: => Future[A])(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => f)
}
def executeBatch[A](list: List[LazyFuture[A]])(concurFactor: Int)(implicit ec: ExecutionContext): Future[List[A]] =
list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])) { (r, c) =>
val batch = Future.sequence(c.map(_ ()))
r.flatMap(rs => batch.map(values => rs ++ values))
}
You can also limit the computation resources by limiting the number of threads in the execution pool. But, this solution is not so flexible. Personally, I do not like it.
val context: ExecutionContext =
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8))
You have to remember to pass correct execution context which is an implicit value. Sometimes we do not know which implicit is in scope. It's buggy
When future is constructed like below
val foo = Future {
1 + 2
} // future starts executing
LazyFuture(foo) // Not a right way
foo
already started executing and cannot be controlled.
Right way to construct LazyFuture
val foo = LazyFuture {
1 + 2
}
or
val foo = LazyFuture {
Future {
1 + 2
}
}
package main
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.Duration
object Main {
case class LazyFuture[A](f: Unit => Future[A]) {
def apply(): Future[A] = f()
}
object LazyFuture {
def apply[A](f: => A)(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => Future(f))
def apply[A](f: => Future[A]): LazyFuture[A] = LazyFuture(_ => f)
}
def executeBatch[A](list: List[LazyFuture[A]])(concurFactor: Int)
(implicit ec: ExecutionContext): Future[List[A]] =
list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])) { (r, c) =>
val batch = Future.sequence(c.map(_ ()))
r.flatMap(rs => r.map(values=> rs ++ values))
}
def main(args: Array[String]): Unit = {
import scala.concurrent.ExecutionContext.Implicits.global
val futures: Seq[LazyFuture[Int]] = List(1, 2, 3, 4, 5).map { value =>
LazyFuture {
println(s"value: $value started")
Thread.sleep(value * 200)
println(s"value: $value stopped")
value
}
}
val f = executeBatch(futures.toList)(2)
Await.result(f, Duration.Inf)
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With