Suppose I have a function, which invokes a blocking interruptible operation. I would like to run it asynchronously with a timeout. That is, I would like to interrupt the function when the timeout is expired.
So I am trying to do something like that:
import scala.util.Try import scala.concurrent.Future def launch(f: () => Unit, timeout: Int): Future[Try[Unit]] = { val aref = new java.util.concurrent.atomic.AtomicReference[Thread]() import ExecutionContext.Implicits.global Future {Thread.sleep(timeout); aref.get().interrupt} // 1 Future {aref.set(Thread.currentThread); Try(f())} // 2 }
The problem is that aref
in (1) can be null because (2) has not set it to the current thread yet. In this case I would like to wait until aref
is set. What is the best way to do that ?
The solution to this problem is to use akka after pattern which is a non-blocking approach to provide timeout to future. According to Akka documentation, after pattern will return a future with success or failure according to the value specified after the duration specified in after pattern.
A Future is a placeholder object for a value that may not yet exist. Generally, the value of the Future is supplied concurrently and can subsequently be used. Composing concurrent tasks in this way tends to result in faster, asynchronous, non-blocking parallel code.
An ExecutionContext can execute program logic asynchronously, typically but not necessarily on a thread pool. A general purpose ExecutionContext must be asynchronous in executing any Runnable that is passed into its execute -method.
You can go for a slightly easier approach using Await. The Await.result
method takes timeout duration as a second parameter and throws a TimeoutException
on timeout.
try { import scala.concurrent.duration._ Await.result(aref, 10 seconds); } catch { case e: TimeoutException => // whatever you want to do. }
I needed the same behavior as well, so this is how I solved it. I basically created an object that creates a timer and fails the promise with a TimeoutException if the future hasn't completed in the specified duration.
package mypackage import scala.concurrent.{Promise, Future} import scala.concurrent.duration.FiniteDuration import akka.actor.ActorSystem import scala.concurrent.ExecutionContext.Implicits.global object TimeoutFuture { val actorSystem = ActorSystem("myActorSystem") def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = { val promise = Promise[A]() actorSystem.scheduler.scheduleOnce(timeout) { promise tryFailure new java.util.concurrent.TimeoutException } Future { try { promise success block } catch { case e:Throwable => promise failure e } } promise.future } }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With