Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

CompletableFuture exception handling runAsync & thenRun

Let's say I have this sample code and an exception is encountered inside runAsync. My question is would this exception prevent the thenRun from getting executed as thenRun runs in the same thread as the caller method of this code.

private void caller() {
    CompletableFuture.runAsync(() -> {
          try {
              // some code
          } catch (Exception e) {
              throw new CustomException(errorMessage, e);
          }
         }, anInstanceOfTaskExecutor).thenRun(
         // thenRun code
     ));
}

I already went through this thread and it explains how you can handle exceptions thrown from asynchronous blocks (i.e. by blocking and using join). I want to know if code inside thenRun block would be executed or not if CompletableFuture completesExceptionally.

Update:

I ran some code to test this:

CompletableFuture.runAsync(() -> {
      List<Integer> integerList = new ArrayList<>();
      integerList.get(1);    // throws exception
    }).thenRun(() -> {
      System.out.println("No exception occurred");
    });

It does not print anything and it means exception didn't 'propagate up to/reach' the caller method's thread from the asynchronous block. I understand the expected behavior here now but I have following questions:

  1. Why is it silently failing even though the CompletableFuture completesExceptionally?
  2. How does it work in the background?
  3. Is it because both these threads (caller's thread & asynchronous thread) have their own stack space?
like image 421
Mr Matrix Avatar asked Jan 28 '20 02:01

Mr Matrix


People also ask

Can CompletableFuture throw exception?

The CompletableFuture. join() method is similar to the get method, but it throws an unchecked exception in case the Future does not complete normally.

What does CompletableFuture runAsync do?

runAsync. Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor after it runs the given action.

What is the difference between runAsync and supplyAsync?

The difference between runAsync() and supplyAsync() is that the former returns a Void while supplyAsync() returns a value obtained by the Supplier. Both methods also support a second input argument — a custom Executor to submit tasks to.

Is CompletableFuture get blocking?

It just provides a get() method which blocks until the result is available to the main thread. Ultimately, it restricts users from applying any further action on the result. You can create an asynchronous workflow with CompletableFuture. It allows chaining multiple APIs, sending ones to result to another.


1 Answers

General Information

The documentation of CompletionStage explains the general rules of the interface:

A stage of a possibly asynchronous computation, that performs an action or computes a value when another CompletionStage completes. A stage completes upon termination of its computation, but this may in turn trigger other dependent stages. The functionality defined in this interface takes only a few basic forms, which expand out to a larger set of methods to capture a range of usage styles:

  • The computation performed by a stage may be expressed as a Function, Consumer, or Runnable (using methods with names including apply, accept, or run, respectively) depending on whether it requires arguments and/or produces results. For example:

    stage.thenApply(x -> square(x))
         .thenAccept(x -> System.out.print(x))
         .thenRun(() -> System.out.println());
    

An additional form (compose) allows the construction of computation pipelines from functions returning completion stages.

Any argument to a stage's computation is the outcome of a triggering stage's computation.

  • One stage's execution may be triggered by completion of a single stage, or both of two stages, or either of two stages. Dependencies on a single stage are arranged using methods with prefix then. Those triggered by completion of both of two stages may combine their results or effects, using correspondingly named methods. Those triggered by either of two stages make no guarantees about which of the results or effects are used for the dependent stage's computation.

  • Dependencies among stages control the triggering of computations, but do not otherwise guarantee any particular ordering. Additionally, execution of a new stage's computations may be arranged in any of three ways: default execution, default asynchronous execution (using methods with suffix async that employ the stage's default asynchronous execution facility), or custom (via a supplied Executor). The execution properties of default and async modes are specified by CompletionStage implementations, not this interface. Methods with explicit Executor arguments may have arbitrary execution properties, and might not even support concurrent execution, but are arranged for processing in a way that accommodates asynchrony.

  • Two method forms (handle and whenComplete) support unconditional computation whether the triggering stage completed normally or exceptionally. Method exceptionally supports computation only when the triggering stage completes exceptionally, computing a replacement result, similarly to the java [sic] catch keyword. In all other cases, if a stage's computation terminates abruptly with an (unchecked) exception or error, then all dependent stages requiring its completion complete exceptionally as well, with a CompletionException holding the exception as its cause. If a stage is dependent on both of two stages, and both complete exceptionally, then the CompletionException may correspond to either one of these exceptions. If a stage is dependent on either of two others, and only one of them completes exceptionally, no guarantees are made about whether the dependent stage completes normally or exceptionally. In the case of method whenComplete, when the supplied action itself encounters an exception, then the stage completes exceptionally with this exception unless the source stage also completed exceptionally, in which case the exceptional completion from the source stage is given preference and propagated to the dependent stage.

All methods adhere to the above triggering, execution, and exceptional completion specifications (which are not repeated in individual method specifications). [...]

[...]

And documentation of CompletableFuture explains the threading rules (and other policies) where, as documented above, some of which are left up to the implementations of CompletionStage:

A Future that may be explicitly completed (setting its value and status), and may be used as a CompletionStage, supporting dependent functions and actions that trigger upon its completion.

When two or more threads attempt to complete, completeExceptionally, or cancel a CompletableFuture, only one of them succeeds.

In addition to these and related methods for directly manipulating status and results, CompletableFuture implements interface CompletionStage with the following policies:

  • Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

  • All async methods without an explicit Executor argument are performed using the ForkJoinPool.commonPool() (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task). This may be overridden for non-static methods in subclasses by defining method defaultExecutor(). To simplify monitoring, debugging, and tracking, all generated asynchronous tasks are instances of the marker interface CompletableFuture.AsynchronousCompletionTask. Operations with time-delays can use adapter methods defined in this class, for example: supplyAsync(supplier, delayedExecutor(timeout, timeUnit)). To support methods with delays and timeouts, this class maintains at most one daemon thread for triggering and cancelling actions, not for running them.

  • All CompletionStage methods are implemented independently of other public methods, so the behavior of one method is not impacted by overrides of others in subclasses.

  • All CompletionStage methods return CompletableFutures. To restrict usages to only those methods defined in interface CompletionStage, use method minimalCompletionStage(). Or to ensure only that clients do not themselves modify a future, use method copy().

CompletableFuture also implements Future with the following policies:

  • Since (unlike FutureTask) this class has no direct control over the computation that causes it to be completed, cancellation is treated as just another form of exceptional completion. Method cancel has the same effect as completeExceptionally(new CancellationException()). Method isCompletedExceptionally() can be used to determine if a CompletableFuture completed in any exceptional fashion.

  • In case of exceptional completion with a CompletionException, methods get() and get(long, TimeUnit) throw an ExecutionException with the same cause as held in the corresponding CompletionException. To simplify usage in most contexts, this class also defines methods join() and getNow(T) that instead throw the CompletionException directly in these cases.

[...]


Your Questions

Here's your example code:

CompletableFuture.runAsync(() -> {
      List<Integer> integerList = new ArrayList<>();
      integerList.get(1);    // throws exception
    }).thenRun(() -> {
      System.out.println("No exception occurred");
    });

If you aren't aware, methods such as thenRun return a new CompletionStage. So your code is similar to the following:

CompletableFuture<Void> runAsyncStage = CompletableFuture.runAsync(() -> List.of().get(0));
CompletableFuture<Void> thenRunStage =
    runAsyncStage.thenRun(() -> System.out.println("thenRun executing!"));

The thenRunStage is triggered by the completion of the runAsyncStage which, in this case, is guaranteed to complete exceptionally with an IndexOutOfBoundsException. As for why the Runnable isn't executed, that's due to the contract of CompletionStage#thenRun(Runnable):

Returns a new CompletionStage that, when this stage completes normally, executes the given action. See the CompletionStage documentation for rules covering exceptional completion.

Due to the triggering stage completing exceptionally the thenRunStage stage also completes exceptionally, which means the Runnable is skipped.

1. "Why is it silently failing even though the CompletableFuture completesExceptionally?"

The example code is the equivalent of swallowing an exception with a try-catch block. You don't see the exception because you haven't written code that would report the exception. Both the runAsyncStage and thenRunStage stages have completed exceptionally, the latter because of the former completing exceptionally.

If you want to be aware of the exception "within the chain" of stages then you have to use stages such as exceptionally[Async], handle[Async], and whenComplete[Async]. Doing it this way allows you to change the behavior of the chain based on normal or exceptional completion of a trigger stage.

If you want to be aware of the exception "outside the chain" of stages then you have to use methods such as join(), get(), and get(long,TimeUnit). If the stage had completed exceptionally then the first will throw a CompletionException wrapping the cause-of-failure while the latter two will throw an ExecutionException wrapping the cause-of-failure.

2. "How does it work in the background?"

The implementation of CompletableFuture is too complex to explain in a Stack Overflow answer. If you want to study the implementation you can look at the source code. Your JDK should have come with a src.zip file containing the Java source files. You can also look at the source code online in the OpenJDK repositories. For instance, here's the source code of CompletableFuture:

https://github.com/openjdk/jdk/blob/master/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java

3. "Is it because both these threads (caller's thread & asynchronous thread) have their own stack space?"

One thread will not be aware of an exception in another thread unless there's some sort of communication between the two threads. Calling methods such as join() will, when appropriate, communicate the exception to the calling thread which will throw said exception. However, as shown by the answer to your first question, it's slightly more complicated than that. Even when a thread throws an exception within a single stage you won't see a stack trace or anything similar. This is because the exception is caught and the stage is marked as failed with that exception as the cause. Other code then has to explicitly retrieve and handle that exception, as needed.

This is no different than using an ExecutorService and returned Future objects. The task may fail in the background, but other code won't be aware of that until the Future is queried.

From bounty: “I am looking to understand the details of how threads interact with each other.”

I'm not sure what else to add. The CompletionStage API is an abstraction "above" threads. You simply tell the API how you would like the chain of commands to execute, including which thread pools to use for each stage, and the implementation handles all the inter-thread communication for you. That said, each thread does it's own thing, it's just the API is designed to provide an easier and reactive way to communicate between threads. If you're interested in how that's implemented then I recommend studying the source code (linked above).

like image 147
Slaw Avatar answered Sep 30 '22 10:09

Slaw