Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to control the concurrency of future.sequence in scala?

I know that I can transform a Seq[Future[T]] into a Future[Seq[T]] via

  val seqFuture = Future.sequence(seqOfFutures)
  seqFuture.map((seqT: Seq[T]) => {...})

My problem now is, that I have 700 futures in that sequence and I want to be able to control how many of them are resolved in parallel as each future will call an internal rest api, and having 700 requests at the same time is like fireing a dos-attack against that server.

I rather only have something like 10 futures being resolved at a time.

How can I achieve that?


Trying pamu's answer I see the error:

[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:44: com.dreamlines.commons.LazyFuture[A] does not take parameters
[error]         val batch = Future.sequence(c.map(_()))
[error]                                            ^
[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:28: no type parameters for method sequence: (in: M[scala.concurrent.Future[A]])(implicit cbf: scala.collection.generic.CanBuildFrom[M[scala.concurrent.Future[A]],A,M[A]], implicit executor: scala.concurrent.ExecutionContext)scala.concurrent.Future[M[A]] exist so that it can be applied to arguments (List[Nothing])
[error]  --- because ---
[error] argument expression's type is not compatible with formal parameter type;
[error]  found   : List[Nothing]
[error]  required: ?M[scala.concurrent.Future[?A]]
[error]         val batch = Future.sequence(c.map(_()))
[error]                            ^
[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:42: type mismatch;
[error]  found   : List[Nothing]
[error]  required: M[scala.concurrent.Future[A]]
[error]         val batch = Future.sequence(c.map(_()))
[error]                                          ^
[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:36: Cannot construct a collection of type M[A] with elements of type A based on a collection of type M[scala.concurrent.Future[A]].
[error]         val batch = Future.sequence(c.map(_()))
[error]                                    ^
[error] four errors found
like image 926
k0pernikus Avatar asked Apr 19 '18 15:04

k0pernikus


People also ask

How do you handle Future Scala?

Handle the result of a future with methods like onComplete , or combinator methods like map , flatMap , filter , andThen , etc. The value in a Future is always an instance of one of the Try types: Success or Failure.

What is Future sequence in Scala?

sequence takes a list of futures and transforms it into a single future of list in an asynchronous manner. For instance, assume that you have a list of independent jobs to be run simultaneously. In such a case, the list of futures can be composed into a single future of list using Future. sequence.

Does Scala support concurrency?

Scala concurrency is built on top of the Java concurrency model. On Sun JVMs, with a IO-heavy workload, we can run tens of thousands of threads on a single machine. A Thread takes a Runnable. You have to call start on a Thread in order for it to run the Runnable.

Is Future blocking onComplete?

NOTE: With Future. onComplete() we are no longer blocking for the result from the Future but instead we will receive a callback for either a Success or a Failure.


2 Answers

Concurrency is of Scala Futures is controlled by the ExecutionContext. Note that futures start executing on the context immediately after creation, so the ExecutionContext of Future.sequence doesn't really matter. You have to supply the appropriate context when creating the original futures from the sequence.

The default context ExecutionContext.global (usually imported through import scala.concurrent.ExecutionContext.Implicits.global) uses as many threads as there are processor cores, but it can also create many additional threads for blocking tasks, that are wrapped in scala.concurrent.blocking. This is usually the desired behaviour, but it's not suitable for your problem.

Fortunately, you can use ExecutionContext.fromExecutor method to wrap a Java thread pool. For example:

import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext

val context = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
val seqOfFutures = Seq.fill(700)(Future { callRestApi() }(context))
val sequenceFuture = Future.sequence(seqOfFutures)(ExecutionContext.global)

The context can also be provided implicitly of course:

implicit val context: ExecutionContext = 
  ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
val seqOfFutures = Seq.fill(700)(Future { callRestApi() })
// This `sequence` uses the same thread pool as the original futures
val sequenceFuture = Future.sequence(seqOfFutures) 
like image 148
Kolmar Avatar answered Sep 28 '22 03:09

Kolmar


FoldLeft

Simple foldLeft can be used to control the number of futures that run concurrently at a time.

First, let's create a case class called LazyFuture

case class LazyFuture[+A](f: Unit => Future[A]) {
  def apply() = f()
}

object LazyFuture {
  def apply[A](f: => A)(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => Future(f))

  def apply[A](f: => Future[A])(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => f)
}

LazyFuture stops future from running immediately

val list: List[LazyFuture[A]] = ...


list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])){ (r, c) =>
  val batch = Future.sequence(c.map(_()))
  batch.flatMap(values => r.map(rs => rs ++ values))
}

Change concurFactor accordingly to run multiple futures concurrently.

concurFactor of 1 will run one future at once

concurFactor of 2 will run two futures at once

and so on ...

def executeBatch[A](list: List[LazyFuture[A]])(concurFactor: Int) =
   list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])){ (r, c) =>
      val batch = Future.sequence(c.map(_()))
      r.flatMap(rs => batch.map(values => rs ++ values))
    }

Complete code

  case class LazyFuture[+A](f: Unit => Future[A]) {
    def apply() = f()
  }

  object LazyFuture {
    def apply[A](f: => A)(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => Future(f))

    def apply[A](f: => Future[A])(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => f)
  }

  def executeBatch[A](list: List[LazyFuture[A]])(concurFactor: Int)(implicit ec: ExecutionContext): Future[List[A]] =
    list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])) { (r, c) =>
      val batch = Future.sequence(c.map(_ ()))
      r.flatMap(rs => batch.map(values => rs ++ values))
    }

Limiting the execution context

You can also limit the computation resources by limiting the number of threads in the execution pool. But, this solution is not so flexible. Personally, I do not like it.

val context: ExecutionContext = 
  ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8))

You have to remember to pass correct execution context which is an implicit value. Sometimes we do not know which implicit is in scope. It's buggy

Warning

When future is constructed like below

val foo = Future {
     1 + 2
} // future starts executing

LazyFuture(foo) // Not a right way

foo already started executing and cannot be controlled.

Right way to construct LazyFuture

val foo = LazyFuture {
  1 + 2
}

or

val foo = LazyFuture {
  Future {
   1 + 2
  }
}

Working example

package main

import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.Duration

object Main {

  case class LazyFuture[A](f: Unit => Future[A]) {
    def apply(): Future[A] = f()
  }

  object LazyFuture {
    def apply[A](f: => A)(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => Future(f))
    def apply[A](f: => Future[A]): LazyFuture[A] = LazyFuture(_ => f)
  }

  def executeBatch[A](list: List[LazyFuture[A]])(concurFactor: Int)
    (implicit ec: ExecutionContext): Future[List[A]] =
    list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])) { (r, c) =>
      val batch = Future.sequence(c.map(_ ()))
      r.flatMap(rs => r.map(values=> rs ++ values))
    }

  def main(args: Array[String]): Unit = {
    import scala.concurrent.ExecutionContext.Implicits.global


    val futures: Seq[LazyFuture[Int]] = List(1, 2, 3, 4, 5).map { value =>
      LazyFuture {
        println(s"value: $value started")
        Thread.sleep(value * 200)
        println(s"value: $value stopped")
        value
      }
    }
    val f = executeBatch(futures.toList)(2)
    Await.result(f, Duration.Inf)
  }

}
like image 32
pamu Avatar answered Sep 28 '22 01:09

pamu