The word "sequence" means a series of actions one after the other.
object Test {
def main(args: Array[String]) {
def producer() = {
val list = Seq(
future { println("startFirst"); Thread.sleep(3000); println("stopFirst") },
future { println("startSecond"); Thread.sleep(1000); println("stopSecond") }
)
Future.sequence(list)
}
Await.result(producer, Duration.Inf)
}
}
Therefore I expect this program to print:
startFirst
stopFirst
startSecond
stopSecond
or even:
startSecond
stopSecond
startFirst
stopFirst
but not (as it happens):
startFirst
startSecond
stopSecond
stopFirst
Why this method is not called Future.parallel()
?
And what should I use to guarantee that all futures in a Seq
of futures are triggered serially (as opposed to in parallel) ?
This Future. sequence() function converts a list of Futures into a single Future that means collections of Futures into a single Future. In simple words, List[Future[T]] ======> Future[List[T]] . It is also known as composing Futures.
Future represents a result of an asynchronous computation that may or may not be available yet. When we create a new Future, Scala spawns a new thread and executes its code. Once the execution is finished, the result of the computation (value or exception) will be assigned to the Future.
Promise is an object which can be completed with a value or failed with an exception. A promise should always eventually be completed, whether for success or failure, in order to avoid unintended resource retention for any associated Futures' callbacks or transformations. Source Promise.scala. AnyRef, Any. Promise.
The futures are running concurrently because they have been started concurrently :). To run them sequentially you need to use flatMap:
Future { println("startFirst"); Thread.sleep(3000); println("stopFirst") }.flatMap{ _ => Future { println("startSecond"); Thread.sleep(1000); println("stopSecond") } }
Future.sequence just turns Seq[Future[T]] => Future[Seq[T]]
which means gather results of all already started futures and put it in future .
a little change to the original Future.sequence will make the future execution serialized:
def seq[A, M[X] <: TraversableOnce[X]](in: M[() => Future[A]])(implicit cbf: CanBuildFrom[M[()=>Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = {
in.foldLeft(Future.successful(cbf(in))) {
(fr, ffa) => for (r <- fr; a <- ffa()) yield (r += a)
} map (_.result())
}
and your code will look like this:
object Test {
def main(args: Array[String]) {
def producer() = {
val list = Seq(
{() => future { println("startFirst"); Thread.sleep(3000); println("stopFirst") }},
{() => future { println("startSecond"); Thread.sleep(1000); println("stopSecond") }}
)
FutureExt.seq(list)
}
Await.result(producer, Duration.Inf)
}
}
which is very like your original code and with same result collection with the original Future.sequence()
Linearize:
import scala.concurrent._
import scala.collection.mutable.Builder
import scala.collection.generic.CanBuildFrom
import language.higherKinds
/**
* Linearize asynchronously applies a given function in-order to a sequence of values, producing a Future with the result of the function applications.
* Execution of subsequent entries will be aborted if an exception is thrown in the application of the function.
*/
def linearize[T, U, C[T] <: Traversable[T]](s: C[T])(f: T => U)(implicit cbf: CanBuildFrom[C[T], U, C[U]], e: ExecutionContext): Future[C[U]] = {
def next(i: Iterator[T], b: Builder[U, C[U]]): Future[C[U]] = if(!i.hasNext) Future successful b.result else Future { b += f(i.next()) } flatMap { b => next(i, b) }
next(s.toIterator, cbf(s))
}
scala> linearize(1 to 100)(_.toString) foreach println
scala> Vector(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)
From: https://gist.github.com/viktorklang/3347939
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With