Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Scala future sequence and timeout handling

There are some good hints how to combine futures with timeouts. However I'm curious how to do this with Future sequence sequenceOfFutures

My first approach looks like this

import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits._

object FutureSequenceScala extends App {
  println("Creating futureList")

  val timeout = 2 seconds
  val futures = List(1000, 1500, 1200, 800, 2000) map { ms =>
    val f = future {
      Thread sleep ms
      ms toString
    }
    Future firstCompletedOf Seq(f, fallback(timeout))
  }

  println("Creating waitinglist")
  val waitingList = Future sequence futures
  println("Created")

  val results = Await result (waitingList, timeout * futures.size)
  println(results)

  def fallback(timeout: Duration) = future {
    Thread sleep (timeout toMillis)
    "-1"
  }
}

Is there a better way to handle timeouts in a sequence of futures or is this a valid solution?

like image 523
Muki Avatar asked Jul 16 '13 09:07

Muki


2 Answers

There are a few things in your code here that you might want to reconsider. For starters, I'm not a huge fan of submitting tasks into the ExecutionContext that have the sole purpose of simulating a timeout and also have Thread.sleep used in them. The sleep call is blocking and you probably want to avoid having a task in the execution context that is purely blocking for the sake of waiting a fixed amount of time. I'm going to steal from my answer here and suggest that for pure timeout handling, you should use something like I outlined in that answer. The HashedWheelTimer is a highly efficient timer implementation that is mush better suited to timeout handling than a task that just sleeps.

Now, if you go that route, the next change I would suggest concerns handling the individual timeout related failures for each future. If you want an individual failure to completely fail the aggregate Future returned from the sequence call, then do nothing extra. If you don't want that to happen, and instead want a timeout to return some default value instead, then you can use recover on the Future like this:

withTimeout(someFuture).recover{
  case ex:TimeoutException => someDefaultValue
}

Once you've done that, you can take advantage of the non-blocking callbacks and do something like this:

waitingList onComplete{
  case Success(results) => //handle success
  case Failure(ex) => //handle fail
}

Each future has a timeout and thus will not just run infinitely. There is no need IMO to block there and provide an additional layer of timeout handling via the atMost param to Await.result. But I guess this assumes you are okay with the non-blocking approach. If you really need to block there, then you should not be waiting timeout * futures.size amount of time. These futures are running in parallel; the timeout there should only need to be as long as the individual timeouts for the futures themselves (or just slightly longer to account for any delays in cpu/timing). It certainly should not be the timeout * the total number of futures.

like image 105
cmbaxter Avatar answered Oct 18 '22 09:10

cmbaxter


Here's a version that shows how bad your blocking fallback is.

Notice that the executor is single threaded and you're creating many fallbacks.

@cmbaxter is right, your master timeout shouldn't be timeout * futures.size, it should be bigger!

@cmbaxter is also right that you want to think non-blocking. Once you do that, and you want to impose timeouts, then you will pick a timer component for that, see his linked answer (also linked from your linked answer).

That said, I still like my answer from your link, in so far as sitting in a loop waiting for the next thing that should timeout is really simple.

It just takes a list of futures and their timeouts and a fallback value.

Maybe there is a use case for that, such as a simple app that just blocks for some results (like your test) and must not exit before results are in.

import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext

import java.util.concurrent.Executors
import java.lang.System.{ nanoTime => now }

object Test extends App { 
  //implicit val xc = ExecutionContext.global
  implicit val xc = ExecutionContext fromExecutorService (Executors.newSingleThreadExecutor)

  def timed[A](body: =>A): A = {
    val start = now 
    val res = body
    val end = now
    Console println (Duration fromNanos end-start).toMillis + " " + res
    res
  }
  println("Creating futureList")

  val timeout = 1500 millis
  val futures = List(1000, 1500, 1200, 800, 2000) map { ms =>
    val f = future {
      timed {
        blocking(Thread sleep ms)
        ms toString
      }
    } 
    Future firstCompletedOf Seq(f, fallback(timeout))
  }   

  println("Creating waitinglist")
  val waitingList = Future sequence futures
  println("Created")

  timed {
  val results = Await result (waitingList, 2 * timeout * futures.size)
  println(results)
  }     
  xc.shutdown

  def fallback(timeout: Duration) = future {
    timed {
      blocking(Thread sleep (timeout toMillis))
      "-1"
    }
  }   
}   

What happened:

Creating futureList
Creating waitinglist
Created
1001 1000
1500 -1
1500 1500
1500 -1
1200 1200
1500 -1
800 800
1500 -1
2000 2000
1500 -1
List(1000, 1500, 1200, 800, 2000)
14007 ()
like image 20
som-snytt Avatar answered Oct 18 '22 09:10

som-snytt