Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What's the intended pattern of canceling already running CompletableFutures

I couldn't find any information in "Java 8 in Action" on why CompletableFuture purposefully ignores mayInterruptIfRunning. But even if so, I don't really see any hook for custom cancel(boolean), which would come in handy in cases, where interruption doesn't affect a blocking operation (such as I/O streams, simple locks, and so on). So far it seems that tasks themselves are the intended hooks and working on the abstraction level of Future simply won't do any good here.

Therefore I'm asking about the minimum set of boilerplate code one has to introduce to squeeze some neet custom cancellation mechanism out of this situation.

like image 852
vitrums Avatar asked Oct 17 '25 13:10

vitrums


1 Answers

A CompletableFuture is an object encapsulating one of three state:

  1. Not completed,
  2. Completed with a value, or
  3. Completed exceptionally

The transition from “Not completed” to one of the other states can be triggered by a function passed to one of its factory methods or to one of the CompletionStage implementation methods.

But it’s also possible to invoke complete or completeExceptionally on it. In that regard, calling cancel on it has the same effect as calling completeExceptionally(new CancellationException()) on it.

That’s also what its class name suggests, it’s a future that can be completed rather than will [eventually] be completed. The class does not offer control over the existing completion attempts that may run in arbitrary threads and it doesn’t treat the completion attempts scheduled by itself specially.

It’s also important to understand that when chaining operations via the CompletionStage implementation methods, the resulting future only represents the last stage of the chain and cancelling it also can only affect the last stage.

E.g. the following code

CompletableFuture<?> cf = CompletableFuture.supplyAsync(() -> {
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
        String s = "value1";
        System.out.println("return initial " + s);
        return s;
    }).thenApply(s -> {
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
        s = s.toUpperCase();
        System.out.println("return transformed " + s);
        return s;
    }).thenAccept(s -> {
        System.out.println("starting last stage");
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
        System.out.println("processed " + s);
    });
cf.cancel(false);
ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);

will print

return initial value1
return transformed VALUE1

demonstrating that only the last stage of the chain has been cancelled whereas the preceding stage ran to completion, entirely unaffected.

Keeping a reference to the first stage in an attempt to cancel the entire chain would only work as long as the first stage has not completed, as an attempt to cancel an already completed future has no effect.

long[] waitBeforeCancel = { 500, 1500 };
for(long l: waitBeforeCancel) {
    CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> {
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
        String s = "value1";
        System.out.println("return initial " + s);
        return s;
    });
    first.thenApply(s -> {
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
            s = s.toUpperCase();
            System.out.println("return transformed " + s);
            return s;
        }).thenAccept(s -> {
            System.out.println("starting last stage");
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
            System.out.println("processed " + s);
        });
    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(l));
    System.out.println("Trying to cancel");
    first.cancel(false);
    ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);
    System.out.println();
}
Trying to cancel
return initial value1

return initial value1
Trying to cancel
return transformed VALUE1
starting last stage
processed VALUE1

This demonstrates that the entire chain gets cancelled when the first stage is cancelled in time (except that the Supplier’s code of the first still completes due to the nonexistent interruption), whereas cancelling too late will not affect any stage.

Remembering all CompletableFuture instances, to be able to cancel all of them, would defeat the purpose of the API. You can use an executor that tracks all currently processed jobs, to forward cancellation and interruption to them when the last stage has been cancelled. The CompletableFuture implementation will forward the cancellation to dependent stages then. That way, completed stages still can be garbage collected.

The setup is a bit involved; the wrapper executor is needed before-hand to construct the CompletableFuture chain and the forwarding of the cancellation needs the last stage of the already constructed chain. That’s why I made a utility method accepting the chain construction code as a Function<Executor,CompletableFuture<T>>:

static <T> Future<T> setupForInterruption(Function<Executor,CompletableFuture<T>> f) {
    return setupForInterruption(f, ForkJoinPool.commonPool());
}
static <T> Future<T> setupForInterruption(
        Function<Executor,CompletableFuture<T>> f, Executor e) {

    AtomicBoolean dontAcceptMore = new AtomicBoolean();
    Set<Future<?>> running = ConcurrentHashMap.newKeySet();
    Executor wrapper = r -> {
        if(dontAcceptMore.get()) throw new CancellationException();
        FutureTask<?> ft = new FutureTask<>(r, null) {
            @Override protected void done() { running.remove(this); }
        };
        running.add(ft);
        e.execute(ft);
    };
    CompletableFuture<T> cf = f.apply(wrapper);
    cf.whenComplete((v,t) -> {
        if(cf.isCancelled()) {
            dontAcceptMore.set(true);
            running.removeIf(ft -> ft.cancel(true) || ft.isDone());
        }
    });
    return cf;
}

This can be used like

long[] waitBeforeCancel = { 500, 1500, 2500, 3500 };
for(long l: waitBeforeCancel) {
    Future<?> f = setupForInterruption(executor ->
        CompletableFuture.supplyAsync(() -> {
                LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
                if(Thread.interrupted()) throw new IllegalStateException();
                String s = "value1";
                System.out.println("return initial " + s);
                return s;
            }, executor).thenApplyAsync(s -> {
                LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
                if(Thread.interrupted()) throw new IllegalStateException();
                s = s.toUpperCase();
                System.out.println("return transformed " + s);
                return s;
            }, executor).thenAcceptAsync(s -> {
                System.out.println("starting last stage");
                LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
                if(Thread.interrupted()) throw new IllegalStateException();
                System.out.println("processed " + s);
            }, executor));
    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(l));
    System.out.println("Trying to cancel");
    f.cancel(true);
    ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);
    System.out.println();
}
Trying to cancel

return initial value1
Trying to cancel

return initial value1
return transformed VALUE1
starting last stage
Trying to cancel

return initial value1
return transformed VALUE1
starting last stage
processed VALUE1
Trying to cancel

Since the API uses Supplier, Function, and Consumer, none of them being allowed to throw InterruptedException, this example code bears explicit test for interruption and throws IllegalStateException instead. That’s also the reason it uses parkNanos which just immediately returns on interruption instead of Thread.sleep in the first place. In real application scenarios, you’ll likely call interruption sensitive methods and have to catch InterruptedException, InterruptedIOException, or InterruptedNamingException (etc.) and convert them to an unchecked exception.

Note that above methods will always cancel with interruption, as the CompletableFuture does not tell whether the cancellation was with interruption or not. If you want to get the value of this parameter, you need a front-end Future implementation that reflects the result of the last stage, forwards cancellation to it, but passes the value of the mayInterruptIfRunning to the currently running jobs.

class FrontEnd<R> implements Future<R> {
    final CompletableFuture<R> lastStage;
    final Set<Future<?>> running;

    FrontEnd(CompletableFuture<R> lastStage, Set<Future<?>> running) {
        this.lastStage = lastStage;
        this.running = running;
    }
    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        boolean didCancel = lastStage.cancel(false);
        if(didCancel)
            running.removeIf(f -> f.cancel(mayInterruptIfRunning) || f.isDone());
        return didCancel;
    }
    @Override
    public boolean isCancelled() {
        return lastStage.isCancelled();
    }
    @Override
    public boolean isDone() {
        return lastStage.isDone();
    }
    @Override
    public R get() throws InterruptedException, ExecutionException {
        return lastStage.get();
    }
    @Override
    public R get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {

        return lastStage.get(timeout, unit);
    }

    static <T> Future<T> setup(Function<Executor,CompletableFuture<T>> f) {
        return setup(f, ForkJoinPool.commonPool());
    }
    static <T> Future<T> setup(Function<Executor,CompletableFuture<T>> f, Executor e) {
        AtomicBoolean dontAcceptMore = new AtomicBoolean();
        Set<Future<?>> running = ConcurrentHashMap.newKeySet();
        Executor wrapper = r -> {
            if(dontAcceptMore.get()) throw new CancellationException();
            FutureTask<?> ft = new FutureTask<>(r, null) {
                @Override protected void done() { running.remove(this); }
            };
            running.add(ft);
            e.execute(ft);
        };
        CompletableFuture<T> cf = f.apply(wrapper);
        cf.whenComplete((v,t) -> { if(cf.isCancelled()) dontAcceptMore.set(true); });
        return new FrontEnd<>(cf, running);
    }
}
like image 188
Holger Avatar answered Oct 20 '25 02:10

Holger



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!