Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Timeout with default value in Java 8 CompletableFuture

Tags:

java

java-8

Suppose I have some async computation, such as:

CompletableFuture
        .supplyAsync(() -> createFoo())
        .thenAccept(foo -> doStuffWithFoo(foo));

Is there a nice way to provide a default value for foo if the async supplier times out according to some specified timeout? Ideally, such functionality would attempt to cancel the slow-running supplier as well. For example, is there standard library functionality that is similar to the following hypothetical code:

CompletableFuture
        .supplyAsync(() -> createFoo())
        .acceptEither(
                CompletableFuture.completedAfter(50, TimeUnit.MILLISECONDS, DEFAULT_FOO),
                foo -> doStuffWithFoo(foo));

Or perhaps even better:

CompletableFuture
        .supplyAsync(() -> createFoo())
        .withDefault(DEFAULT_FOO, 50, TimeUnit.MILLISECONDS)
        .thenAccept(foo -> doStuffWithFoo(foo));

I know about get(timeout, unit), but am wondering if there's a nicer standard way of applying a timeout in an asynchronous and reactive fashion as suggested in the code above.

EDIT: Here's a solution that's inspired by Java 8: Mandatory checked exceptions handling in lambda expressions. Why mandatory, not optional?, but unfortunately it blocks a thread. If we rely on createFoo() to asynchronously check for timeout and throw its own timeout exception it would work without blocking a thread, but would place more burden on the creator of the supplier and would still have the cost of creating an exception (which can be expensive without "fast throw")

static <T> Supplier<T> wrapped(Callable<T> callable) {
    return () -> {
        try {
            return callable.call();
        } catch (RuntimeException e1) {
            throw e1;
        } catch (Throwable e2) {
            throw new RuntimeException(e2);
        }
    };
}
CompletableFuture
        .supplyAsync(wrapped(() -> CompletableFuture.supplyAsync(() -> createFoo()).get(50, TimeUnit.MILLISECONDS)))
        .exceptionally(e -> "default")
        .thenAcceptAsync(s -> doStuffWithFoo(foo));
like image 636
jonderry Avatar asked May 09 '14 22:05

jonderry


People also ask

What is the purpose of using timeout function in CompletableFuture?

Overview. orTimeout() is an instance method of the CompletableFuture which is used to exceptional timeout if the future is not completed within the given period of time. Once the waiting period is over, the method throws an exception. This method was introduced in Java version 9.

How do you implement timeout in Java?

First, we create a SteppedTask with four steps. Second, we run the task using a thread. Last, we interrupt the thread after ten seconds using a timer and a timeout task. With this design, we can ensure our long-running task can be interrupted while executing any step.

How CompletableFuture is non-blocking?

CompletableFuture is used for asynchronous programming in Java. Asynchronous programming is a means of writing non-blocking code by running a task on a separate thread than the main application thread and notifying the main thread about its progress, completion or failure.


3 Answers

CompletableFuture.supplyAsync is just a helper method that creates a CompletableFuture for you, and submits the task to the ForkJoin Pool.

You can create your own supplyAsync with your requirements like this:

private static final ScheduledExecutorService schedulerExecutor = 
                                 Executors.newScheduledThreadPool(10);
private static final ExecutorService executorService = 
                                 Executors.newCachedThreadPool();


public static <T> CompletableFuture<T> supplyAsync(
        final Supplier<T> supplier, long timeoutValue, TimeUnit timeUnit,
        T defaultValue) {

    final CompletableFuture<T> cf = new CompletableFuture<T>();

    // as pointed out by Peti, the ForkJoinPool.commonPool() delivers a 
    // ForkJoinTask implementation of Future, that doesn't interrupt when cancelling
    // Using Executors.newCachedThreadPool instead in the example
    // submit task
    Future<?> future = executorService.submit(() -> {
        try {
            cf.complete(supplier.get());
        } catch (Throwable ex) {
            cf.completeExceptionally(ex);
        }
    });

    //schedule watcher
    schedulerExecutor.schedule(() -> {
        if (!cf.isDone()) {
            cf.complete(defaultValue);
            future.cancel(true);
        }

    }, timeoutValue, timeUnit);

    return cf;
}

Creating the CompletableFuture with that helper is as easy as using the static method in CompletableFuture:

    CompletableFuture<String> a = supplyAsync(() -> "hi", 1,
            TimeUnit.SECONDS, "default");

To test it:

    a = supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e1) {
            // ignore
        }
        return "hi";
    }, 1, TimeUnit.SECONDS, "default");
like image 88
Ruben Avatar answered Oct 18 '22 21:10

Ruben


In Java 9, there will be completeOnTimeout(T value, long timeout, TimeUnit unit), which does what you want, although it does not cancel the slow supplier.

There is also a orTimeout(long timeout, TimeUnit unit), which completes exceptionally in case on a timeout.

like image 43
user140547 Avatar answered Oct 18 '22 20:10

user140547


DZone has a good article how to solve this: https://dzone.com/articles/asynchronous-timeouts

I'm not sure about the copyright of the code, hence I can't copy it here. The solution is very much like the one from Dane White but it uses a thread pool with a single thread plus schedule() to avoid wasting a thread just to wait for the timeout.

It also throws a TimeoutException instead of returning a default.

like image 7
Aaron Digulla Avatar answered Oct 18 '22 21:10

Aaron Digulla