Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Scala Future/Promise fast-fail pipeline

I want to launch two or more Future/Promises in parallel and fail even if one of the launched Future/Promise fails and dont want to wait for the rest to complete. What is the most idiomatic way to compose this pipeline in Scala.

EDIT: more contextual information.

I have to launch two external processes one writing to a fifo file and another reading from it. Say if the writer process fails; the reader thread might hang forever waiting for any input from the file. So I would want to launch both the processes in parallel and fail fast even if one of the Future/Promise fails without waiting for the completion of the other.

Below is the sample code to be more precise. the commands are not exactly cat and tail. I have used them for brevity.

val future1 = Future { executeShellCommand("cat file.txt > fifo.pipe") }
val future2 = Future { executeShellCommand("tail fifo.pipe") }
like image 467
rogue-one Avatar asked Sep 11 '16 15:09

rogue-one


2 Answers

If I understand the question correctly, what we are looking for is a fail-fast sequence implementation, which is akin to a failure-biased version of firstCompletedOf

Here, we eagerly register a failure callback in case one of the futures fails early on, ensuring that we fail as soon as any of the futures fail.

import scala.concurrent.{Future, Promise}
import scala.util.{Success, Failure}
import scala.concurrent.ExecutionContext.Implicits.global
def failFast[T](futures: Seq[Future[T]]): Future[Seq[T]] = {
  val promise = Promise[Seq[T]]
  futures.foreach{f => f.onFailure{case ex => promise.failure(ex)}}
  val res = Future.sequence(futures)
  promise.completeWith(res).future
}

In contrast to Future.sequence, this implementation will fail as soon as any of the futures fail, regardless of ordering. Let's show that with an example:

import scala.util.Try
// help method to measure time
def resilientTime[T](t: =>T):(Try[T], Long) = {
  val t0 = System.currentTimeMillis
  val res = Try(t)
  (res, System.currentTimeMillis-t0)
}

import scala.concurrent.duration._
import scala.concurrent.Await

First future will fail (failure in 2 seconds)

val f1 = Future[Int]{Thread.sleep(2000); throw new Exception("boom")}
val f2 = Future[Int]{Thread.sleep(5000); 42}
val f3 = Future[Int]{Thread.sleep(10000); 101}
val res = failFast(Seq(f1,f2,f3))

resilientTime(Await.result(res, 10.seconds))
// res: (scala.util.Try[Seq[Int]], Long) = (Failure(java.lang.Exception: boom),1998)

Last future will fail. Failure also in 2 seconds. (note the order in the sequence construction)

val f1 = Future[Int]{Thread.sleep(2000); throw new Exception("boom")}
val f2 = Future[Int]{Thread.sleep(5000); 42}
val f3 = Future[Int]{Thread.sleep(10000); 101}
val res = failFast(Seq(f3,f2,f1))

resilientTime(Await.result(res, 10.seconds))
// res: (scala.util.Try[Seq[Int]], Long) = (Failure(java.lang.Exception: boom),1998)

Comparing with Future.sequence where failure depends on the ordering (failure in 10 seconds):

val f1 = Future[Int]{Thread.sleep(2000); throw new Exception("boom")}
val f2 = Future[Int]{Thread.sleep(5000); 42}
val f3 = Future[Int]{Thread.sleep(10000); 101}
val seq = Seq(f3,f2,f1)

resilientTime(Await.result(Future.sequence(seq), 10.seconds))
//res: (scala.util.Try[Seq[Int]], Long) = (Failure(java.lang.Exception: boom),10000)
like image 108
maasg Avatar answered Oct 17 '22 10:10

maasg


Use Future.sequence:

val both = Future.sequence(Seq(
  firstFuture,
  secondFuture));

This is the correct way to aggregate two or more futures where the failure of one fails the aggregated future and the aggregated future completes when all inner futures complete. An older version of this answer suggested a for-comprehension which while very common would not reject immediately of one of the futures rejects but rather wait for it.

like image 42
Benjamin Gruenbaum Avatar answered Oct 17 '22 08:10

Benjamin Gruenbaum