Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Cancellation with Future and Promise in Scala

This is a followup to my previous question.

Suppose I have a task, which executes an interruptible blocking call. I would like to run it as a Future and cancel it with failure method of Promise.

I would like the cancel to work as follows:

  • If one cancels the task before it finished I would like the task to finish "immediately", interrupting the blocking call if it has already started and I would like the Future to invoke onFailure.

  • If one cancels the task after the task finished I would like to get a status saying that the cancel failed since the task already finished.

Does it make sense? Is it possible to implement in Scala? Are there any examples of such implementations?

like image 813
Michael Avatar asked Apr 15 '13 17:04

Michael


People also ask

What is future and promise in Scala?

The Promise is a writable, single-assignment container that completes a Future. The Promise is similar to the Future. However, the Future is about the read-side of an asynchronous operation, while the Promise is about the write-side.

What is Future cancel?

Cancelling a Future You can cancel a future using Future. cancel() method. It attempts to cancel the execution of the task and returns true if it is cancelled successfully, otherwise, it returns false.

What is ExecutionContext Scala?

An ExecutionContext can execute program logic asynchronously, typically but not necessarily on a thread pool. A general purpose ExecutionContext must be asynchronous in executing any Runnable that is passed into its execute -method.


2 Answers

Here is the interruptable version of Victor's code per his comments (Victor, please correct me if I misinterpreted).

object CancellableFuture extends App {

  def interruptableFuture[T](fun: () => T)(implicit ex: ExecutionContext): (Future[T], () => Boolean) = {
    val p = Promise[T]()
    val f = p.future
    val aref = new AtomicReference[Thread](null)
    p tryCompleteWith Future {
      val thread = Thread.currentThread
      aref.synchronized { aref.set(thread) }
      try fun() finally {
        val wasInterrupted = (aref.synchronized { aref getAndSet null }) ne thread
        //Deal with interrupted flag of this thread in desired
      }
    }

    (f, () => {
      aref.synchronized { Option(aref getAndSet null) foreach { _.interrupt() } }
      p.tryFailure(new CancellationException)
    })
  }

  val (f, cancel) = interruptableFuture[Int] { () =>
    val latch = new CountDownLatch(1)

    latch.await(5, TimeUnit.SECONDS)    // Blocks for 5 sec, is interruptable
    println("latch timed out")

    42  // Completed
  }

  f.onFailure { case ex => println(ex.getClass) }
  f.onSuccess { case i => println(i) }

  Thread.sleep(6000)   // Set to less than 5000 to cancel

  val wasCancelled = cancel()

  println("wasCancelled: " + wasCancelled)
}

With Thread.sleep(6000) the output is:

latch timed out
42
wasCancelled: false

With Thread.sleep(1000) the output is:

wasCancelled: true
class java.util.concurrent.CancellationException
like image 132
sourcedelica Avatar answered Oct 04 '22 08:10

sourcedelica


Twitter's futures implement cancellation. Have a look here:

https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Future.scala

Line 563 shows the abstract method responsible for this. Scala's futures currently do not support cancellation.

like image 35
gzm0 Avatar answered Oct 04 '22 08:10

gzm0