Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Canceling a CompletableFuture chain

Tags:

java

java-8

I have a chain of asynchronous service calls which I would like to cancel. Well, actually, I have two chains of service calls going in parallel, and if one succeeds, I would like to cancel the other.

With guava's futures I'm accustomed to canceling an entire chain of Futures by canceling the last one. It appears I cannot do this with java-8's futures. Unless someone has a good idea of how.

You're task, should you choose to accept it, is to tell me if I can keep my pretty syntax AND have the chain cancellation. Otherwise, I'll be writing my own chaining future wrapper - especially after this question.


My own tests and attempts follow.

@Test
public void shouldCancelOtherFutures() {
    // guava
    ListenableFuture<String> as = Futures.immediateFuture("a");
    ListenableFuture<String> bs = Futures.transform(as, (AsyncFunction<String, String>) x -> SettableFuture.create());
    ListenableFuture<String> cs = Futures.transform(bs, (AsyncFunction<String, String>) x -> SettableFuture.create());
    ListenableFuture<String> ds = Futures.transform(cs, Functions.<String>identity());

    ds.cancel(false);
    assertTrue(cs.isDone()); // succeeds

    // jdk 8
    CompletableFuture<String> ac = CompletableFuture.completedFuture("a");
    CompletableFuture<String> bc = ac.thenCompose(x -> new CompletableFuture<>());
    CompletableFuture<String> cc = bc.thenCompose(x -> new CompletableFuture<>());
    CompletableFuture<String> dc = cc.thenApply(Function.identity());

    dc.cancel(false);
    assertTrue(cc.isDone()); // fails
}

(Imagine that each thenCompose() and Futures.transform(x, AsyncFunction) represents an asynchronous service call.)

I can see why Doug Lee's army of grad students did it this way. With branching chains, should everything be canceled?

CompletableFuture<Z> top = new CompletableFuture<>()
    .thenApply(x -> y(x))
    .thenCompose(y -> z(y));

CompletableFuture<?> aBranch = top.thenCompose(z -> aa(z));
CompletableFuture<?> bBranch = top.thenCompose(z -> bb(z));

...
bBranch.cancel(false);
// should aBranch be canceled now?

I can get around the problem with a custom wrapper function, but it messes up the pretty syntax.

private <T,U> CompletableFuture<U> transformAsync(CompletableFuture<T> source, Function<? super T,? extends CompletableFuture<U>> transform) {
    CompletableFuture<U> next = source.thenCompose(transform);
    next.whenComplete((x, err) -> next.cancel(false));
    return next;
}

private <T,U> CompletableFuture<U> transform(CompletableFuture<T> source, Function<T,U> transform) {
    CompletableFuture<U> next = source.thenApply(transform);
    next.whenComplete((x, err) -> next.cancel(false));
    return next;
}

// nice syntax I wished worked
CompletableFuture<?> f1 = serviceCall()
        .thenApply(w -> x(w))
        .thenCompose(x -> serviceCall())
        .thenCompose(y -> serviceCall())
        .thenApply(z -> $(z));

// what works, with less readable syntax
CompletableFuture<?> f2 =
        transform(
            transformAsync(
                transformAsync(
                    transform(serviceCall, x(w)),
                    x -> serviceCall()),
                y -> serviceCall()),
            z -> $(z));
like image 892
Michael Deardeuff Avatar asked Aug 21 '14 03:08

Michael Deardeuff


People also ask

How do I interrupt CompletableFuture?

As such, there's nothing you can do through CompletableFuture to interrupt any thread that may be running some task that will complete it. You'll have to write your own logic which tracks any Thread instances which acquire a reference to the CompletableFuture with the intention to complete it.

Is Completable future get blocking?

It just provides a get() method which blocks until the result is available to the main thread. Ultimately, it restricts users from applying any further action on the result. You can create an asynchronous workflow with CompletableFuture. It allows chaining multiple APIs, sending ones to result to another.

Is Completable future reactive?

Ans. A CompletableFuture represents one result of an asynchronous call, while Reactive Streams is a pattern for pushing N messages synchronously/asynchronously through a system. CompletableFuture doesn't address the elasticity requirement of the Reactive Manifesto as it does not handle backpressure.

What does CompletableFuture return?

toCompletableFuture() is an instance method of the CompletableFuture class. It is used to return the same completable future upon which this method is invoked.


1 Answers

It depends on what your goal is. I assume, that it is not important to have your intermediate CompletableFutures report a completed state as you usually won’t notice when using chained construction call. The important point is that you wish your expensive serviceCall() not to be triggered.

One solution can be:

CompletableFuture<String> flag=new CompletableFuture<>();

CompletableFuture<String> ac = serviceCall()
  .thenCompose(x -> flag.isCancelled()? flag: serviceCall())
  .thenCompose(x -> flag.isCancelled()? flag: serviceCall());
ac.whenComplete((v,t)->flag.cancel(false));// don’t chain this call

This uses the whenComplete call like in your solution but only on the final CompletableFuture to propagate the cancellation to a dedicated flag object. After calling cancel the next thenCompose invocation will detect the cancellation and return the cancelled future, thus the cancellation will propagate the chain then so no more compose or apply methods are invoked.

The drawback is that this can not combined with thenApply as the Function can’t return a cancelled future. Therefore, when the asynchronous service call completes and is chained with a Function, the function will be applied even if a cancellation occurred.

The alternate solution solving this issue as well is to create a wrapper function for your serviceCall which includes a test before launching and after completion:

CompletableFuture<String> serviceCall(CompletableFuture<String> f) {
    if(f.isCancelled()) return f;
    CompletableFuture<String> serviceCall=serviceCall();
    return serviceCall.thenCompose(x->f.isCancelled()? f: serviceCall);
}

Then your use case will look like:

CompletableFuture<String> flag=new CompletableFuture<>();

CompletableFuture<String> ac = serviceCall(flag)
  .thenApply(w->x(w))
  .thenCompose(x -> serviceCall(flag))
  .thenCompose(x -> serviceCall(flag))
  .thenApply(z -> $(z));
ac.whenComplete((v,t)->flag.cancel(false));

Of course, you have to replace <String> with whatever type parameter your original serviceCall() uses for the CompletableFuture<T>.

like image 91
Holger Avatar answered Sep 20 '22 16:09

Holger