Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Futures / Success race

I'm learning futures, and I'm trying to create a method that, take two futures as parameter (f and g) and return the first future that was successfully completed, otherwise it returns f or g.

Some use cases to illustrate the behaviour of my method are :

Future 1        | Future 2         | Result
Success First     Success Second     Future 1
Success First     Failure Second     Future 1
Success Second    Success First      Future 2
Success Second    Failure First      Future 1
Failure First     Failure Second     Future 2 (because we had a failure on Future 1, so try to see what is the result Future 2)

So I created this method :

def successRace(f: Future[T], g: Future[T]): Future[T] = {
        val p1 = Promise[T]()
        val p2 = Promise[T]()
        val p3 = Promise[T]()
        p1.completeWith(f)
        p2.completeWith(g)
        p3. ????
        p3.future
}

And now, how can I know which one completed first ?

like image 636
user2336315 Avatar asked Dec 18 '13 13:12

user2336315


2 Answers

The use case is the first successful completion:

scala> :pa
// Entering paste mode (ctrl-D to finish)

def firstSuccessOf[T](fs: Future[T]*)(implicit x: ExecutionContext): Future[T] = {
  val p = Promise[T]()
  val count = new java.util.concurrent.atomic.AtomicInteger(fs.size)
  def bad() = if (count.decrementAndGet == 0) { p tryComplete new Failure(new RuntimeException("All bad")) }
  val completeFirst: Try[T] => Unit = p tryComplete _
  fs foreach { _ onComplete { case v @ Success(_) => completeFirst(v) case _ => bad() }}
  p.future
}

// Exiting paste mode, now interpreting.

firstSuccessOf: [T](fs: scala.concurrent.Future[T]*)(implicit x: scala.concurrent.ExecutionContext)scala.concurrent.Future[T]

so

scala> def f = Future { Thread sleep 5000L ; println("Failing") ; throw new NullPointerException }
f: scala.concurrent.Future[Nothing]

scala> def g = Future { Thread sleep 10000L ; println("OK") ; 7 }
g: scala.concurrent.Future[Int]

scala> firstSuccessOf(f,g)
res3: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@5ed53f6b

scala> res0Failing
          3.value
res4: Option[scala.util.Try[Int]] = None

scala> res3.valueOK

res5: Option[scala.util.Try[Int]] = Some(Success(7))

or

scala> def h = Future { Thread sleep 7000L ; println("Failing too") ; throw new NullPointerException }
h: scala.concurrent.Future[Nothing]


scala> firstSuccessOf(f,h)
res10: scala.concurrent.Future[Nothing] = scala.concurrent.impl.Promise$DefaultPromise@318d30be

scala> 

scala> res10.Failing
value
res11: Option[scala.util.Try[Nothing]] = None

scala> Failing too


scala> res10.value
res12: Option[scala.util.Try[Nothing]] = Some(Failure(java.lang.RuntimeException: All bad))

@ ysusuk 's answer is what Future.firstCompletedOf does under the hood.

like image 61
som-snytt Avatar answered Sep 30 '22 20:09

som-snytt


You want to use the tryCompleteWith method. It can be called multiple times and only the first completing future wins.

def successRace(f: Future[T], g: Future[T]): Future[T] = {
  val p = Promise[T]()
  p.tryCompleteWith(f)
  p.tryCompleteWith(g)
  p.future
}
like image 22
joescii Avatar answered Sep 30 '22 19:09

joescii