Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

failure in Scala future's for comprehension

Tags:

scala

future

I have three sequential Futures and use in the for comprehension like this

val comF = for {
  f1 <- future1
  f2 <- future2
  f3 <- future3
} yield {
  // something
}

comF onSuccess { }
comF onFailure { 
  // ----------------      Here is the problem  --------------------------------
  //
  // How do I know which future failed(throw exception), when the callback comes here ?
  // Thanks for the help! Different futures using different exceptions can solve it.
}

Now I have a future list like List[Future[T]], and first I transfer it to Future[List[T]] using this method (Why does this list-of-futures to future-of-list transformation compile and work?). then I get the future

val fList: Future[List[T]]
fList on Failure {
  // 
  // How do I know which is Fail now >??
}
like image 304
hliu Avatar asked Dec 02 '22 18:12

hliu


2 Answers

Consider the code:

def func = {
  try {
    val x = maybeThrows
    val y = maybeThrowsToo
    val z = maybeThrowsAsWell
    result(x, y, x)
  } catch (RuntimeException e) {
    // How do I know which maybeThrows failed?
  }
}

The Future case works essentially in the same way.


Even grouping computations in List doesn't help:

def func = {
  try {
    val x = maybeThrows
    val y = maybeThrowsToo
    val z = maybeThrowsAsWell
    val list = List(x, y, z)
    result(list)
  } catch (RuntimeException e) {
    // How do I know which maybeThrows failed?
  }
}

Spoiler: you have to track explicityl which computation failed. It would result in some boilerplate if done with try/catch. But luckily with Future (and Try) the boilerplate isn't that bad:

class TaggedException(val idx, exc: Exception)

def tagFailedWithIndex[T](idx: Int, f: Future[T]): Future[T] = 
  future recoverWith { case exc => Future.failed(new TaggedException(idx, exc)) }

val comF = for {
  f1 <- tagFailedWithIndex(0, future1)
  f2 <- tagFailedWithIndex(1, future2)
  f3 <- tagFailedWithIndex(2, future3)
} yield something(f1, f2, f3)

comF onFailure { 
  case exc: TaggedException => "%d computation failed".format(exc.idx)
}

Spoiler you have to track which computation failed explicitly. It would result in a lot of boilerplate if done with try/catch. But luckily there is Try, and Future behaves even more the same:

class TaggedException(val idx, exc: Exception)

def tagFailedWithIndex[T](idx: Int, f: Future[T]): Future[T] = 
  future recoverWith { case exc => Future.failed(new TaggedException(idx, exc)) }

val comF = for {
  f1 <- tagFailedWithIndex(0, future1)
  f2 <- tagFailedWithIndex(1, future2)
  f3 <- tagFailedWithIndex(2, future3)
} yield something(f1, f2, f3)

comF onFailure { 
  case exc: TaggedException => "%d computation failed".format(exc.idx)
}
like image 138
phadej Avatar answered Dec 28 '22 23:12

phadej


The problem: flatMap combines the Futures into one single Future

You are using flatMap, so the futures are nested into one single future.

Your code is

import scala.concurrent.Future

val future1, future2, future3 = Future[Any]()

val comF = for {
  f1 <- future1
  f2 <- future2
  f3 <- future3
} yield {
  // something
}

comF onSuccess { ??? }
comF onFailure { ??? }

When I apply the de-sugaring rules to your for-comprehension, I get

val comF = (future1).flatMap { case f1 => 
  (future2).flatMap { case f2 => 
    (future3).map { case f3 => {
        // something
      }
    }
  }
}

You can clearly see the use of flatMap here. In principle flatMap does two things: It applies a function to the result of the first Future. This must be a function that maps the result of the first Future to another Future, i.e. a Future nested in the first Future (this is the map part). Then it 'unnestes' the two Futures and merges them in one single Future (the flat part). On this point, the two Futures don't exist any more (from a conceptual point of view; technically they are still there). Instead there is only one 'merged' Future.

The two calls to flatMap create a new Future out of the original three. This is the reason why you cannot find out, which of the three original Futures raises the exception: It's none of them. Instead the newly created Future runs and raises the exception.

Solution: Track your progress separately

If you want to know, which steps of your calculation ran before one raised an exception, you must track the progress separately, e.g. through an additional parameter in the Exception. Alternatively you can remove flatMap and run the individual Futures one after the other.

An example on how to track your progress:

First we create a new exception class, that contains the real exception together with some information, where the exception came from

class ExceptionWithOrigin[T](val origin : T, nested : Throwable) extends Exception(nested)

object ExceptionWithOrigin {
  def wrapFuture[T,U](origin : T, f : Future[U]) : Future[U] = {
    f.transform(Predef.identity, new ExceptionWithOrigin[T](origin,_))
  }

  def wrapFuture[U](f : Future[U]) = wrapFuture(f,f)
}

For the Futures we have no special requirements.

val future1,future2,future3 = Future[Any]()

We then wrap up the given Futures using the helper method from the companion object of our newly created exception class.

import ExceptionWithOrigin._

val comF = for {
  result1 <- wrapFuture(future1)
  result2 <- wrapFuture(future2)
  result3 <- wrapFuture(future3)
} yield {
  // something
}

When you catch some exception ex, you can now just use ex.origin to find out where it came from. Of course, the origin is not quite correct. The original Futures future1, future2 and future3 are not really executed. Instead a newly created Future runs, created by the flatMap. But nevertheless the origin still works.

A hint about better naming

Btw, you should rename f1, f2 and f3 to result1, result2 and result3. They don't represent a Future, but the result of the calculation of each Future (the value each Future returns).

like image 36
stefan.schwetschke Avatar answered Dec 28 '22 23:12

stefan.schwetschke