I have a chain of asynchronous service calls which I would like to cancel. Well, actually, I have two chains of service calls going in parallel, and if one succeeds, I would like to cancel the other.
With guava's futures I'm accustomed to canceling an entire chain of Futures by canceling the last one. It appears I cannot do this with java-8's futures. Unless someone has a good idea of how.
You're task, should you choose to accept it, is to tell me if I can keep my pretty syntax AND have the chain cancellation. Otherwise, I'll be writing my own chaining future wrapper - especially after this question.
My own tests and attempts follow.
@Test
public void shouldCancelOtherFutures() {
// guava
ListenableFuture<String> as = Futures.immediateFuture("a");
ListenableFuture<String> bs = Futures.transform(as, (AsyncFunction<String, String>) x -> SettableFuture.create());
ListenableFuture<String> cs = Futures.transform(bs, (AsyncFunction<String, String>) x -> SettableFuture.create());
ListenableFuture<String> ds = Futures.transform(cs, Functions.<String>identity());
ds.cancel(false);
assertTrue(cs.isDone()); // succeeds
// jdk 8
CompletableFuture<String> ac = CompletableFuture.completedFuture("a");
CompletableFuture<String> bc = ac.thenCompose(x -> new CompletableFuture<>());
CompletableFuture<String> cc = bc.thenCompose(x -> new CompletableFuture<>());
CompletableFuture<String> dc = cc.thenApply(Function.identity());
dc.cancel(false);
assertTrue(cc.isDone()); // fails
}
(Imagine that each thenCompose()
and Futures.transform(x, AsyncFunction)
represents an asynchronous service call.)
I can see why Doug Lee's army of grad students did it this way. With branching chains, should everything be canceled?
CompletableFuture<Z> top = new CompletableFuture<>()
.thenApply(x -> y(x))
.thenCompose(y -> z(y));
CompletableFuture<?> aBranch = top.thenCompose(z -> aa(z));
CompletableFuture<?> bBranch = top.thenCompose(z -> bb(z));
...
bBranch.cancel(false);
// should aBranch be canceled now?
I can get around the problem with a custom wrapper function, but it messes up the pretty syntax.
private <T,U> CompletableFuture<U> transformAsync(CompletableFuture<T> source, Function<? super T,? extends CompletableFuture<U>> transform) {
CompletableFuture<U> next = source.thenCompose(transform);
next.whenComplete((x, err) -> next.cancel(false));
return next;
}
private <T,U> CompletableFuture<U> transform(CompletableFuture<T> source, Function<T,U> transform) {
CompletableFuture<U> next = source.thenApply(transform);
next.whenComplete((x, err) -> next.cancel(false));
return next;
}
// nice syntax I wished worked
CompletableFuture<?> f1 = serviceCall()
.thenApply(w -> x(w))
.thenCompose(x -> serviceCall())
.thenCompose(y -> serviceCall())
.thenApply(z -> $(z));
// what works, with less readable syntax
CompletableFuture<?> f2 =
transform(
transformAsync(
transformAsync(
transform(serviceCall, x(w)),
x -> serviceCall()),
y -> serviceCall()),
z -> $(z));
As such, there's nothing you can do through CompletableFuture to interrupt any thread that may be running some task that will complete it. You'll have to write your own logic which tracks any Thread instances which acquire a reference to the CompletableFuture with the intention to complete it.
It just provides a get() method which blocks until the result is available to the main thread. Ultimately, it restricts users from applying any further action on the result. You can create an asynchronous workflow with CompletableFuture. It allows chaining multiple APIs, sending ones to result to another.
Ans. A CompletableFuture represents one result of an asynchronous call, while Reactive Streams is a pattern for pushing N messages synchronously/asynchronously through a system. CompletableFuture doesn't address the elasticity requirement of the Reactive Manifesto as it does not handle backpressure.
toCompletableFuture() is an instance method of the CompletableFuture class. It is used to return the same completable future upon which this method is invoked.
It depends on what your goal is. I assume, that it is not important to have your intermediate CompletableFuture
s report a completed state as you usually won’t notice when using chained construction call. The important point is that you wish your expensive serviceCall()
not to be triggered.
One solution can be:
CompletableFuture<String> flag=new CompletableFuture<>();
CompletableFuture<String> ac = serviceCall()
.thenCompose(x -> flag.isCancelled()? flag: serviceCall())
.thenCompose(x -> flag.isCancelled()? flag: serviceCall());
ac.whenComplete((v,t)->flag.cancel(false));// don’t chain this call
This uses the whenComplete
call like in your solution but only on the final CompletableFuture
to propagate the cancellation to a dedicated flag
object. After calling cancel
the next thenCompose
invocation will detect the cancellation and return the cancelled future, thus the cancellation will propagate the chain then so no more compose or apply methods are invoked.
The drawback is that this can not combined with thenApply
as the Function
can’t return a cancelled future. Therefore, when the asynchronous service call completes and is chained with a Function
, the function will be applied even if a cancellation occurred.
The alternate solution solving this issue as well is to create a wrapper function for your serviceCall
which includes a test before launching and after completion:
CompletableFuture<String> serviceCall(CompletableFuture<String> f) {
if(f.isCancelled()) return f;
CompletableFuture<String> serviceCall=serviceCall();
return serviceCall.thenCompose(x->f.isCancelled()? f: serviceCall);
}
Then your use case will look like:
CompletableFuture<String> flag=new CompletableFuture<>();
CompletableFuture<String> ac = serviceCall(flag)
.thenApply(w->x(w))
.thenCompose(x -> serviceCall(flag))
.thenCompose(x -> serviceCall(flag))
.thenApply(z -> $(z));
ac.whenComplete((v,t)->flag.cancel(false));
Of course, you have to replace <String>
with whatever type parameter your original serviceCall()
uses for the CompletableFuture<T>
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With