Suppose I have some async computation, such as:
CompletableFuture
.supplyAsync(() -> createFoo())
.thenAccept(foo -> doStuffWithFoo(foo));
Is there a nice way to provide a default value for foo if the async supplier times out according to some specified timeout? Ideally, such functionality would attempt to cancel the slow-running supplier as well. For example, is there standard library functionality that is similar to the following hypothetical code:
CompletableFuture
.supplyAsync(() -> createFoo())
.acceptEither(
CompletableFuture.completedAfter(50, TimeUnit.MILLISECONDS, DEFAULT_FOO),
foo -> doStuffWithFoo(foo));
Or perhaps even better:
CompletableFuture
.supplyAsync(() -> createFoo())
.withDefault(DEFAULT_FOO, 50, TimeUnit.MILLISECONDS)
.thenAccept(foo -> doStuffWithFoo(foo));
I know about get(timeout, unit)
, but am wondering if there's a nicer standard way of applying a timeout in an asynchronous and reactive fashion as suggested in the code above.
EDIT: Here's a solution that's inspired by Java 8: Mandatory checked exceptions handling in lambda expressions. Why mandatory, not optional?, but unfortunately it blocks a thread. If we rely on createFoo() to asynchronously check for timeout and throw its own timeout exception it would work without blocking a thread, but would place more burden on the creator of the supplier and would still have the cost of creating an exception (which can be expensive without "fast throw")
static <T> Supplier<T> wrapped(Callable<T> callable) {
return () -> {
try {
return callable.call();
} catch (RuntimeException e1) {
throw e1;
} catch (Throwable e2) {
throw new RuntimeException(e2);
}
};
}
CompletableFuture
.supplyAsync(wrapped(() -> CompletableFuture.supplyAsync(() -> createFoo()).get(50, TimeUnit.MILLISECONDS)))
.exceptionally(e -> "default")
.thenAcceptAsync(s -> doStuffWithFoo(foo));
Overview. orTimeout() is an instance method of the CompletableFuture which is used to exceptional timeout if the future is not completed within the given period of time. Once the waiting period is over, the method throws an exception. This method was introduced in Java version 9.
First, we create a SteppedTask with four steps. Second, we run the task using a thread. Last, we interrupt the thread after ten seconds using a timer and a timeout task. With this design, we can ensure our long-running task can be interrupted while executing any step.
CompletableFuture is used for asynchronous programming in Java. Asynchronous programming is a means of writing non-blocking code by running a task on a separate thread than the main application thread and notifying the main thread about its progress, completion or failure.
CompletableFuture.supplyAsync is just a helper method that creates a CompletableFuture for you, and submits the task to the ForkJoin Pool.
You can create your own supplyAsync with your requirements like this:
private static final ScheduledExecutorService schedulerExecutor =
Executors.newScheduledThreadPool(10);
private static final ExecutorService executorService =
Executors.newCachedThreadPool();
public static <T> CompletableFuture<T> supplyAsync(
final Supplier<T> supplier, long timeoutValue, TimeUnit timeUnit,
T defaultValue) {
final CompletableFuture<T> cf = new CompletableFuture<T>();
// as pointed out by Peti, the ForkJoinPool.commonPool() delivers a
// ForkJoinTask implementation of Future, that doesn't interrupt when cancelling
// Using Executors.newCachedThreadPool instead in the example
// submit task
Future<?> future = executorService.submit(() -> {
try {
cf.complete(supplier.get());
} catch (Throwable ex) {
cf.completeExceptionally(ex);
}
});
//schedule watcher
schedulerExecutor.schedule(() -> {
if (!cf.isDone()) {
cf.complete(defaultValue);
future.cancel(true);
}
}, timeoutValue, timeUnit);
return cf;
}
Creating the CompletableFuture with that helper is as easy as using the static method in CompletableFuture:
CompletableFuture<String> a = supplyAsync(() -> "hi", 1,
TimeUnit.SECONDS, "default");
To test it:
a = supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
// ignore
}
return "hi";
}, 1, TimeUnit.SECONDS, "default");
In Java 9, there will be completeOnTimeout(T value, long timeout, TimeUnit unit), which does what you want, although it does not cancel the slow supplier.
There is also a orTimeout(long timeout, TimeUnit unit), which completes exceptionally in case on a timeout.
DZone has a good article how to solve this: https://dzone.com/articles/asynchronous-timeouts
I'm not sure about the copyright of the code, hence I can't copy it here. The solution is very much like the one from Dane White but it uses a thread pool with a single thread plus schedule()
to avoid wasting a thread just to wait for the timeout.
It also throws a TimeoutException
instead of returning a default.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With