Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why Future.sequence executes my futures in parallel rather than in series?

Tags:

scala

future

The word "sequence" means a series of actions one after the other.

object Test {

  def main(args: Array[String]) {

    def producer() = {
      val list = Seq(
          future { println("startFirst"); Thread.sleep(3000); println("stopFirst") }, 
          future { println("startSecond"); Thread.sleep(1000); println("stopSecond") }
      )
      Future.sequence(list)
    }

   Await.result(producer, Duration.Inf)
  }
}

Therefore I expect this program to print: startFirst stopFirst startSecond stopSecond

or even: startSecond stopSecond startFirst stopFirst

but not (as it happens): startFirst startSecond stopSecond stopFirst

Why this method is not called Future.parallel()? And what should I use to guarantee that all futures in a Seq of futures are triggered serially (as opposed to in parallel) ?

like image 680
sscarduzio Avatar asked Jul 31 '14 10:07

sscarduzio


People also ask

What Future sequence does?

This Future. sequence() function converts a list of Futures into a single Future that means collections of Futures into a single Future. In simple words, List[Future[T]] ======> Future[List[T]] . It is also known as composing Futures.

What is Scala Futures?

Future represents a result of an asynchronous computation that may or may not be available yet. When we create a new Future, Scala spawns a new thread and executes its code. Once the execution is finished, the result of the computation (value or exception) will be assigned to the Future.

What is promise in Scala?

Promise is an object which can be completed with a value or failed with an exception. A promise should always eventually be completed, whether for success or failure, in order to avoid unintended resource retention for any associated Futures' callbacks or transformations. Source Promise.scala. AnyRef, Any. Promise.


3 Answers

The futures are running concurrently because they have been started concurrently :). To run them sequentially you need to use flatMap:

Future { println("startFirst");           Thread.sleep(3000);           println("stopFirst")          }.flatMap{          _ =>  Future {                         println("startSecond");                         Thread.sleep(1000);                         println("stopSecond")                 }         } 

Future.sequence just turns Seq[Future[T]] => Future[Seq[T]] which means gather results of all already started futures and put it in future .

like image 97
grotrianster Avatar answered Sep 21 '22 23:09

grotrianster


a little change to the original Future.sequence will make the future execution serialized:

def seq[A, M[X] <: TraversableOnce[X]](in: M[() => Future[A]])(implicit cbf: CanBuildFrom[M[()=>Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = {
    in.foldLeft(Future.successful(cbf(in))) {
       (fr, ffa) => for (r <- fr; a <- ffa()) yield (r += a)
    } map (_.result())
}

and your code will look like this:

object Test {
def main(args: Array[String]) {

    def producer() = {
      val list = Seq(
          {() => future { println("startFirst"); Thread.sleep(3000); println("stopFirst") }}, 
          {() => future { println("startSecond"); Thread.sleep(1000); println("stopSecond") }}
      )
      FutureExt.seq(list)
    }

    Await.result(producer, Duration.Inf)
    }
}

which is very like your original code and with same result collection with the original Future.sequence()

like image 25
eagle yuan Avatar answered Sep 19 '22 23:09

eagle yuan


Linearize:

import scala.concurrent._
import scala.collection.mutable.Builder
import scala.collection.generic.CanBuildFrom
import language.higherKinds

/**
 * Linearize asynchronously applies a given function in-order to a sequence of values, producing a Future with the result of the function applications.
 * Execution of subsequent entries will be aborted if an exception is thrown in the application of the function.
 */
def linearize[T, U, C[T] <: Traversable[T]](s: C[T])(f: T => U)(implicit cbf: CanBuildFrom[C[T], U, C[U]], e: ExecutionContext): Future[C[U]] = {
  def next(i: Iterator[T], b: Builder[U, C[U]]): Future[C[U]] = if(!i.hasNext) Future successful b.result else Future { b += f(i.next()) } flatMap { b => next(i, b) }
  next(s.toIterator, cbf(s))
}

scala> linearize(1 to 100)(_.toString) foreach println

scala> Vector(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)

From: https://gist.github.com/viktorklang/3347939

like image 35
Viktor Klang Avatar answered Sep 18 '22 23:09

Viktor Klang