Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Wait until all Future.onComplete callbacks are executed

Tags:

scala

future

I am using the Future API from Scala 2.10.X.

Here is my use case:

object Class1 {

  def apply(f: (Int) => Future[String])(i: Int): Future[String] = {

    val start = DateTime.now

    val result = f(i)

    result.onComplete{
      case _ => println("Started at " + start + ", ended at " + DateTime.now)
    }

    result
  }
}

Pretty simple I think: I'm adding an onComplete callback to my future. Now, I'm wondering if there is a way to add a callback for when the onComplete is done executing - in this example know when the logging is done.

Let's say my result instance has 3 onComplete registered, can I know when all of them have been executed? I don't think it's possible but who knows :)

Maybe an alternative would be to call map instead of onComplete to return a new instance of Future:

def apply(f: (Int) => Future[String])(i: Int): Future[String] = {

  val start = DateTime.now

  f(i) map {
    case r => 
      println("Started at " + start + ", ended at " + DateTime.now)
      r
  }
}

But I am not sure it would keep the same behavior.

Edit: Just to clarify - there is only one instance of Future, and I call onComplete 3 times on the same instance (Well, in my example only once, but let's say I'm calling it N times) and I want to know when the 3 callbacks are done executing due to the completion of the same Future instance.

like image 577
vptheron Avatar asked Jan 17 '14 14:01

vptheron


1 Answers

If you don't want to use other methods (like a CountDownLatch), then you want to use andThen to know when your operations complete (successfully or not, and whether or not the Future was successful).

scala> val f = Future(3)
f: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@4b49ca35

scala> val g = f andThen { case Success(i) => println(i) } andThen { case _ => println("All done") }
3
g: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@1939e13
All done

If the future fails, the mapped function by contrast isn't invoked:

scala> val f = Future[Int](???)
f: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@7001619b

scala> val g = f andThen { case t => println(s"stage 1 $t") } andThen { case _ => println("All done") }
stage 1 Failure(java.util.concurrent.ExecutionException: Boxed Error)
All done
g: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@24e1e7e8

scala> val g = f map { case i => println(i) } andThen { case _ => println("All done") }
All done
g: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@5d0f75d6

scala> val g = f map { case i => println(i) } map { case _ => println("All done") }
g: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@5aabe81f

scala> g.value
res1: Option[scala.util.Try[Unit]] = Some(Failure(java.util.concurrent.ExecutionException: Boxed Error))

Similarly, blowing up in a chained handler doesn't break subsequent operations:

scala> val g = f andThen { case t => null.hashCode } andThen { case _ => Thread.sleep(1000L); println("All done") }
java.lang.NullPointerException
    at $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.applyOrElse(<console>:51)
    at $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.applyOrElse(<console>:51)
    at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:431)
    at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:430)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
g: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@3fb7bec8

scala> All done


scala> g.value
res1: Option[scala.util.Try[Int]] = Some(Success(3))

For the unfortunate case of needing to wait for it:

scala> val f = Future[Int](???)
f: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@859a977

scala> import java.util.concurrent.{ CountDownLatch => CDL }
import java.util.concurrent.{CountDownLatch=>CDL}

scala> val latch = new CDL(3)
latch: java.util.concurrent.CountDownLatch = java.util.concurrent.CountDownLatch@11683e9f[Count = 3]

scala> f onComplete { _ => println(1); latch.countDown() }
1

scala> f onComplete { _ => println(2); latch.countDown() }
2

scala> f onComplete { _ => println(3); latch.countDown() }
3

scala> f onComplete { _ => latch.await(); println("All done") }
All done
like image 92
som-snytt Avatar answered Sep 23 '22 08:09

som-snytt