I am playing with Java 8 completable futures. I have the following code:
CountDownLatch waitLatch = new CountDownLatch(1);
CompletableFuture<?> future = CompletableFuture.runAsync(() -> {
try {
System.out.println("Wait");
waitLatch.await(); //cancel should interrupt
System.out.println("Done");
} catch (InterruptedException e) {
System.out.println("Interrupted");
throw new RuntimeException(e);
}
});
sleep(10); //give it some time to start (ugly, but works)
future.cancel(true);
System.out.println("Cancel called");
assertTrue(future.isCancelled());
assertTrue(future.isDone());
sleep(100); //give it some time to finish
Using runAsync I schedule execution of a code that waits on a latch. Next I cancel the future, expecting an interrupted exception to be thrown inside. But it seems that the thread remains blocked on the await call and the InterruptedException is never thrown even though the future is canceled (assertions pass). An equivalent code using ExecutorService works as expected. Is it a bug in the CompletableFuture or in my example?
Overview. In Java, we use an instance method of the CompletableFuture class, cancel() , that attempts to cancel the execution of a task.
The CompletableFuture. get() method is blocking. It waits until the Future is completed and returns the result after its completion.
A CompletableFuture is an extension to Java's Future API which was introduced in Java 8. A Future is used for asynchronous Programming. It provides two methods, isDone() and get(). The methods retrieve the result of the computation when it completes.
CompletableFuture is a Future . It overrides methods of future, meaning that you can wait for the result of the future, with or without a timeout. You can request the status of the future (whether it's done), etc. Waits if necessary for this future to complete, and then returns its result.
When you call CompletableFuture#cancel
, you only stop the downstream part of the chain. Upstream part, i. e. something that will eventually call complete(...)
or completeExceptionally(...)
, doesn't get any signal that the result is no more needed.
What are those 'upstream' and 'downstream' things?
Let's consider the following code:
CompletableFuture
.supplyAsync(() -> "hello") //1
.thenApply(s -> s + " world!") //2
.thenAccept(s -> System.out.println(s)); //3
Here, the data flows from top to bottom - from being created by supplier, through being modified by function, to being consumed by println
. The part above particular step is called upstream, and the part below is downstream. E. g. steps 1 and 2 are upstream for step 3.
Here's what happens behind the scenes. This is not precise, rather it's a convenient mind model of what's going on.
ForkJoinPool
).complete(...)
to the next CompletableFuture
downstream.CompletableFuture
invokes next step - a function (step 2) which takes in previous step result and returns something that will be passed further, to the downstream CompletableFuture
's complete(...)
.CompletableFuture
invokes the consumer, System.out.println(s)
. After consumer is finished, the downstream CompletableFuture
will receive it's value, (Void) null
As we can see, each CompletableFuture
in this chain has to know who are there downstream waiting for the value to be passed to their's complete(...)
(or completeExceptionally(...)
). But the CompletableFuture
don't have to know anything about it's upstream (or upstreams - there might be several).
Thus, calling cancel()
upon step 3 doesn't abort steps 1 and 2, because there's no link from step 3 to step 2.
It is supposed that if you're using CompletableFuture
then your steps are small enough so that there's no harm if a couple of extra steps will get executed.
If you want cancellation to be propagated upstream, you have two options:
CompletableFuture
(name it like cancelled
) which is checked after every step (something like step.applyToEither(cancelled, Function.identity())
)Apparently, it's intentional. The Javadoc for the method CompletableFuture::cancel states:
[Parameters:] mayInterruptIfRunning - this value has no effect in this implementation because interrupts are not used to control processing.
Interestingly, the method ForkJoinTask::cancel uses almost the same wording for the parameter mayInterruptIfRunning.
I have a guess on this issue:
Instead of blocking, a CompletableFuture should create a new CompletionStage, and cpu-bound tasks are a prerequisite for the fork-join model. So, using interruption with either of them would defeat their purpose. And on the other hand, it might increase complexity, that's not required if used as intended.
If you actually want to be able to cancel a task, then you have to use Future
itself (e.g. as returned by ExecutorService.submit(Callable<T>)
, not CompletableFuture
. As pointed out in the answer by nosid, CompletableFuture
completely ignores any call to cancel(true)
.
My suspicion is that the JDK team did not implement interruption because:
InputStream.read()
being blocking calls! (And the JDK team have no plans to make the standard I/O system interruptible again, like it was in the very early Java days.)Object.finalize()
, Object.wait()
, Thread.stop()
, etc. I believe Thread.interrupt()
is considered to be in the category of things that must be eventually deprecated and replaced. Therefore, newer APIs (like ForkJoinPool
and CompletableFuture
) are already not supporting it.CompletableFuture
was designed for building DAG-structured pipelines of operations, similar to the Java Stream
API. It's very dificult to succinctly describe how interruption of one node of a dataflow DAG should affect execution in the rest of the DAG. (Should all concurrent tasks be canceled immediately, when any node is interrupted?)One very hacky way around this would be to have each CompletableFuture
export a reference to itself to an externally-visible AtomicReference
, then the Thread
reference could be interrupted directly when needed from another external thread. Or if you start all the tasks using your own ExecutorService
, in your own ThreadPool
, you can manually interrupt any or all the threads that were started, even if CompletableFuture
refuses to trigger interruption via cancel(true)
. (Note though that CompletableFuture
lambdas cannot throw checked exceptions, so if you have an interruptible wait in a CompletableFuture
, you'll have to re-throw as an unchecked exception.)
More simply, you could just declare an AtomicReference<Boolean> cancel = new AtomicReference<>()
in an external scope, and periodically check this flag from inside each CompletableFuture
task's lambda.
You could also try setting up a DAG of Future
instances rather than a DAG of CompletableFuture
instances, that way you can exactly specify how exceptions and interruption/cancellation in any one task should affect the other currently-running tasks. I show how to do this in my example code in my question here, and it works well, but it's a lot of boilerplate.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With