Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to chain non-blocking action in CompletionStage.exceptionally

I am writing a Play2 application service method in Java that should do the following. Asynchronously call method A, and if that fails, asynchronously call method B.

To illustrate assume this interface for the backend called by the service:

public interface MyBackend {
    CompletionStage<Object> tryWrite(Object foo);
    CompletionStage<Object> tryCleanup(Object foo);
}

So in my service method, I want to return a Future that can complete with these:

  • Success of tryWrite completed
  • Fail of tryWrite and Success of tryCleanup completed and failing with exception of tryWrite()

(Note: Of course tryWrite() could do any cleanup itself, this is a simplified example to illustrate a problem)

The implementation of a service calling the backend like this seems difficult to me because the CompletionStage.exceptionally() method does not allow Composing.

Version 1:

public class MyServiceImpl {
    public CompletionStage<Object> tryWriteWithCleanup(Object foo) {

        CompletionStage<Object> writeFuture = myBackend.tryWrite(foo)
            .exceptionally((throwable) -> {
                CompletionStage<Object> cleanupFuture = myBackend.tryCleanup(foo);
                throw new RuntimeException(throwable);
        });
        return writeFuture;
    }
}

So version 1 calls tryCleanup(foo) in a non-blocking way, but the CompletionStage returned by tryWriteWithCleanup() will not wait for cleanupFuture to complete. How to change this code to return a future from the service that would also wait for completion of cleanupFuture?

Version 2:

public class MyServiceImpl {
    public CompletionStage<Object> tryWriteWithCleanup(Object foo) {

        final AtomicReference<Throwable> saveException = new AtomicReference<>();
        CompletionStage<Object> writeFuture = myBackend
            .tryWrite(foo)
            .exceptionally(t -> {
                saveException.set(t);
                // continue with cleanup
                return null;
            })
            .thenCompose((nil) -> {
                // if no cleanup necessary, return
                if (saveException.get() == null) {
                    return CompletableFuture.completedFuture(null);
                }
                return CompletionStage<Object> cleanupFuture = myBackend.tryCleanup(foo)
                    .exceptionally(cleanupError -> {
                        // log error
                        return null;
                    })
                    .thenRun(() -> {
                        throw saveException.get();
                    });
        });
        return writeFuture;
    }
}

Version2 uses an external AtomicReference to store the failure, and makes the asynchronous second call in another thenCompose() block, if there was a failure.

All my other attempts to do so ended up so unwieldy that I don't want to paste them here.

like image 809
tkruse Avatar asked Jun 14 '17 02:06

tkruse


People also ask

How do you handle exceptions in runAsync?

runAsync(() -> { //process and throw exception }, anInstanceOfTaskExecutor ) . thenRun(() -> {}) . exceptionally(exception -> { // do something, handle exception }) )); In this case, it will execute thenRun .

What is exceptionally in CompletableFuture?

exceptionally. public CompletableFuture<T> exceptionally( Function<Throwable, ? extends T> fn) { ... } In method exceptionally() , you only have access to the exception and not the result. Because as the method name indicates, the method only handles exceptional cases: when an exception happened.

What is Completion stage?

public interface CompletionStage<T> A stage of a possibly asynchronous computation, that performs an action or computes a value when another CompletionStage completes. A stage completes upon termination of its computation, but this may in turn trigger other dependent stages.

How do Completable Futures work?

CompletableFuture is a Future . It overrides methods of future, meaning that you can wait for the result of the future, with or without a timeout. You can request the status of the future (whether it's done), etc. Waits if necessary for this future to complete, and then returns its result.

How do I chain completionstage methods?

The methods which return CompletionStage<U> can be chained with methods which takes Consumer<U> or Function<U,R>. Following is another example which reads the content of a web page:

What is the difference between completionstage handle () and whencomplete () methods?

The above methods, accept BiConsumer, whereas CompletionStage.handle (....) methods accept BiFunction. That means handle () methods are allowed to return a result (in case of exception a recovering result) thus they can handle the exception. On the other hand, whenComplete () methods cannot return a results.

What is completionstage in Java?

java.util.concurrent.CompletionStage<T> interface represents a commutation task (either synchronous or asynchronous). As all methods declared in this interface return an instance of CompletionStage itself, multiple CompletionStages can be chained together in different ways to complete a group of tasks.

What is the use of unhandled exceptions in completionstages?

So they are used as merely callbacks that do not interfere in the processing pipeline of CompletionStages. If there's an unhandled exception coming from the stages before 'whenComplete' stage then that exception is passed through as it is.


2 Answers

Unfortunately CompletionStage/CompletableFuture does not provide exception handling API's with composition.

You can work around this though by relying on a handle() with a BiFunction that returns a CompletionStage. This will give you nested stages (CompletionStage<CompletionStage<Object>>) that you can the "unnest" using compose(identity()):

public CompletionStage<Object> tryWriteWithCleanup(Object foo) {
    return myBackend.tryWrite(foo)
            .handle((r, e) -> {
                if (e != null) {
                    return myBackend.tryCleanup(foo)
                            .handle((r2, e2) -> {
                                // Make sure we always return the original exception
                                // but keep track of new exception if any,
                                // as if run in a finally block
                                if (e2 != null) {
                                    e.addSuppressed(e2);
                                }
                                // wrapping in CompletionException  behaves as if
                                // we threw the original exception
                                throw new CompletionException(e);
                            });
                }
                return CompletableFuture.completedFuture(r);
            })
            .thenCompose(Function.identity());
}
like image 101
Didier L Avatar answered Dec 02 '22 08:12

Didier L


You may simply wait for the completion inside the handler:

public CompletionStage<Object> tryWriteWithCleanup(Object foo) {
    return myBackend.tryWrite(foo).exceptionally(throwable -> {
        myBackend.tryCleanup(foo).toCompletableFuture().join();
        throw new CompletionException(throwable);
    });
}

This will defer the completion of the result CompletionStage to the completion of the cleanup stage. Using CompletionException as wrapper will make the wrapping transparent to the caller.

However, it has some drawbacks. While the framework might utilize the thread while waiting or spawn a compensation thread, if it is a worker thread, the blocked thread might be the caller thread if the stage returned by tryWrite happens to be already completed when entering exceptionally. Unfortunately, there is no exceptionallyAsync method. You may use handleAsync instead, but it will complicate the code while still feeling like a kludge.

Further, exceptions thrown by the cleanup may shadow the original failure.

A cleaner solution may be a bit more involved:

public CompletionStage<Object> tryWriteWithCleanup(Object foo) {

    CompletableFuture<Object> writeFuture = new CompletableFuture<>();

    myBackend.tryWrite(foo).whenComplete((obj,throwable) -> {
        if(throwable==null)
            writeFuture.complete(obj);
        else
            myBackend.tryCleanup(foo).whenComplete((x,next) -> {
                try {
                    if(next!=null) throwable.addSuppressed(next);
                }
                finally {
                    writeFuture.completeExceptionally(throwable);
                }
        });
    });
    return writeFuture;
}

This simply creates a CompletableFuture manually, allowing to control its completion, which will happen either directly by the action chained to tryWrite’s stage in the successful case, or by the action chained to the cleanup stage in the exceptional case. Note that the latter takes care about chaining a possible subsequent cleanup exception via addSuppressed.

like image 26
Holger Avatar answered Dec 02 '22 07:12

Holger