Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to subclass CompletableFuture?

I'd like to subclass CompletableFuture to override the default Executor. That is, if a user invokes a method without specifying an Executor, I want my own Executor to get used instead of the one normally used by CompletableFuture.

The Javadoc hints at the possibility of subclassing:

All CompletionStage methods are implemented independently of other public methods, so the behavior of one method is not impacted by overrides of others in subclasses.

How am I supposed to implement static methods like CompletableFuture.supplyAsync() in the subclass if the underlying implementation depends on methods like internalComplete() which is package-private?

How is one supposed to subclass CompletableFuture?


What I'm trying to do...

My user code needs to execute multiple tasks asynchronously using the same executor. For example: CompletableFuture.supplyAsync(..., executor).thenApplyAsync(..., executor).thenApplyAsync(..., executor). I'd like the custom CompletableFuture implementation to use the first executor throughout all follow-up calls.

like image 602
Gili Avatar asked Oct 26 '14 23:10

Gili


People also ask

What is join in CompletableFuture?

Overview. join() is an instance method of the CompletableFuture class. It is used to return the value when the future is complete or throws an unchecked exception if completed exceptionally.

Does CompletableFuture get block?

The CompletableFuture. get() method is blocking. It waits until the Future is completed and returns the result after its completion.

What is completedStage () method in CompletableFuture interface?

completedFuture​(U value) Returns a new CompletableFuture that is already completed with the given value. static <U> CompletionStage<U> completedStage​(U value) Returns a new CompletionStage that is already completed with the given value and supports only those methods in interface CompletionStage .


1 Answers

Since you didn’t show us what you have tried, we don’t have a chance to find out what you did exactly and why it failed. After you clarifications, it looks like a straight-forward decoration pattern job that doesn’t need to touch any of the CompletableFuture’s inner workings.

import java.util.concurrent.*;
import java.util.function.*;

public class MyCompletableFuture<T> extends CompletableFuture<T> {
    public static <T> CompletableFuture<T> supplyAsync(Supplier<T> s, Executor e) {
        return my(CompletableFuture.supplyAsync(s, e), e);
    }
    private static <T> CompletableFuture<T> my(CompletableFuture<T> f, Executor e) {
        MyCompletableFuture<T> my=new MyCompletableFuture<>(f, e);
        f.whenComplete((v,t)-> {
            if(t!=null) my.completeExceptionally(t); else my.complete(v);
        });
        return my;
    }
    private final CompletableFuture<T> baseFuture;
    private final Executor executor;

    MyCompletableFuture(CompletableFuture<T> base, Executor e) {
        baseFuture=base;
        executor=e;
    }
    private <T> CompletableFuture<T> my(CompletableFuture<T> base) {
        return my(base, executor);
    }
    @Override
    public CompletableFuture<Void> acceptEitherAsync(
            CompletionStage<? extends T> other, Consumer<? super T> action) {
        return my(baseFuture.acceptEitherAsync(other, action, executor));
    }
    @Override
    public <U> CompletableFuture<U> applyToEitherAsync(
            CompletionStage<? extends T> other, Function<? super T, U> fn) {
        return my(baseFuture.applyToEitherAsync(other, fn, executor));
    }
    @Override
    public <U> CompletableFuture<U> handleAsync(
            BiFunction<? super T, Throwable, ? extends U> fn) {
        return my(baseFuture.handleAsync(fn, executor));
    }
    @Override
    public CompletableFuture<Void> runAfterBothAsync(
            CompletionStage<?> other, Runnable action) {
        return my(baseFuture.runAfterBothAsync(other, action, executor));
    }
    @Override
    public CompletableFuture<Void> runAfterEitherAsync(
            CompletionStage<?> other, Runnable action) {
        return my(baseFuture.runAfterEitherAsync(other, action, executor));
    }
    @Override
    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
        return my(baseFuture.thenAcceptAsync(action, executor));
    }
    @Override
    public <U> CompletableFuture<Void> thenAcceptBothAsync(
            CompletionStage<? extends U> other,
            BiConsumer<? super T, ? super U> action) {
        return my(baseFuture.thenAcceptBothAsync(other, action, executor));
    }
    @Override
    public <U> CompletableFuture<U> thenApplyAsync(
            Function<? super T, ? extends U> fn) {
        return my(baseFuture.thenApplyAsync(fn, executor));
    }
    @Override
    public <U, V> CompletableFuture<V> thenCombineAsync(
            CompletionStage<? extends U> other,
            BiFunction<? super T, ? super U, ? extends V> fn) {
        return my(baseFuture.thenCombineAsync(other, fn, executor));
    }
    @Override
    public <U> CompletableFuture<U> thenComposeAsync(
            Function<? super T, ? extends CompletionStage<U>> fn) {
        return my(baseFuture.thenComposeAsync(fn, executor));
    }
    @Override
    public CompletableFuture<Void> thenRunAsync(Runnable action) {
        return my(baseFuture.thenRunAsync(action, executor));
    }
    @Override
    public CompletableFuture<T> whenCompleteAsync(
            BiConsumer<? super T, ? super Throwable> action) {
        return my(baseFuture.whenCompleteAsync(action, executor));
    }
}

Here is a simple test case which shows that it works as expected:

ScheduledExecutorService ses=Executors.newSingleThreadScheduledExecutor();
Executor e=r -> {
    System.out.println("adding delay");
    ses.schedule(r, 2, TimeUnit.SECONDS);
};
MyCompletableFuture.supplyAsync(()->"initial value", e)
  .thenApplyAsync(String::hashCode)
  .thenApplyAsync(Integer::toOctalString)
  .thenAcceptAsync(System.out::println);
like image 70
Holger Avatar answered Sep 28 '22 05:09

Holger