I'd like to subclass CompletableFuture to override the default Executor
. That is, if a user invokes a method without specifying an Executor
, I want my own Executor
to get used instead of the one normally used by CompletableFuture
.
The Javadoc hints at the possibility of subclassing:
All CompletionStage methods are implemented independently of other public methods, so the behavior of one method is not impacted by overrides of others in subclasses.
How am I supposed to implement static methods like CompletableFuture.supplyAsync()
in the subclass if the underlying implementation depends on methods like internalComplete()
which is package-private?
How is one supposed to subclass CompletableFuture?
My user code needs to execute multiple tasks asynchronously using the same executor. For example: CompletableFuture.supplyAsync(..., executor).thenApplyAsync(..., executor).thenApplyAsync(..., executor)
. I'd like the custom CompletableFuture
implementation to use the first executor throughout all follow-up calls.
Overview. join() is an instance method of the CompletableFuture class. It is used to return the value when the future is complete or throws an unchecked exception if completed exceptionally.
The CompletableFuture. get() method is blocking. It waits until the Future is completed and returns the result after its completion.
completedFuture(U value) Returns a new CompletableFuture that is already completed with the given value. static <U> CompletionStage<U> completedStage(U value) Returns a new CompletionStage that is already completed with the given value and supports only those methods in interface CompletionStage .
Since you didn’t show us what you have tried, we don’t have a chance to find out what you did exactly and why it failed. After you clarifications, it looks like a straight-forward decoration pattern job that doesn’t need to touch any of the CompletableFuture
’s inner workings.
import java.util.concurrent.*;
import java.util.function.*;
public class MyCompletableFuture<T> extends CompletableFuture<T> {
public static <T> CompletableFuture<T> supplyAsync(Supplier<T> s, Executor e) {
return my(CompletableFuture.supplyAsync(s, e), e);
}
private static <T> CompletableFuture<T> my(CompletableFuture<T> f, Executor e) {
MyCompletableFuture<T> my=new MyCompletableFuture<>(f, e);
f.whenComplete((v,t)-> {
if(t!=null) my.completeExceptionally(t); else my.complete(v);
});
return my;
}
private final CompletableFuture<T> baseFuture;
private final Executor executor;
MyCompletableFuture(CompletableFuture<T> base, Executor e) {
baseFuture=base;
executor=e;
}
private <T> CompletableFuture<T> my(CompletableFuture<T> base) {
return my(base, executor);
}
@Override
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action) {
return my(baseFuture.acceptEitherAsync(other, action, executor));
}
@Override
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn) {
return my(baseFuture.applyToEitherAsync(other, fn, executor));
}
@Override
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn) {
return my(baseFuture.handleAsync(fn, executor));
}
@Override
public CompletableFuture<Void> runAfterBothAsync(
CompletionStage<?> other, Runnable action) {
return my(baseFuture.runAfterBothAsync(other, action, executor));
}
@Override
public CompletableFuture<Void> runAfterEitherAsync(
CompletionStage<?> other, Runnable action) {
return my(baseFuture.runAfterEitherAsync(other, action, executor));
}
@Override
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
return my(baseFuture.thenAcceptAsync(action, executor));
}
@Override
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return my(baseFuture.thenAcceptBothAsync(other, action, executor));
}
@Override
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T, ? extends U> fn) {
return my(baseFuture.thenApplyAsync(fn, executor));
}
@Override
public <U, V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T, ? super U, ? extends V> fn) {
return my(baseFuture.thenCombineAsync(other, fn, executor));
}
@Override
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn) {
return my(baseFuture.thenComposeAsync(fn, executor));
}
@Override
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return my(baseFuture.thenRunAsync(action, executor));
}
@Override
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action) {
return my(baseFuture.whenCompleteAsync(action, executor));
}
}
Here is a simple test case which shows that it works as expected:
ScheduledExecutorService ses=Executors.newSingleThreadScheduledExecutor();
Executor e=r -> {
System.out.println("adding delay");
ses.schedule(r, 2, TimeUnit.SECONDS);
};
MyCompletableFuture.supplyAsync(()->"initial value", e)
.thenApplyAsync(String::hashCode)
.thenApplyAsync(Integer::toOctalString)
.thenAcceptAsync(System.out::println);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With