Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

CompletableFuture swallows exceptions?

I've been playing around with CompletableFuture and noticed a strange thing.

String url = "http://google.com";  CompletableFuture<String> contentsCF = readPageCF(url); CompletableFuture<List<String>> linksCF = contentsCF.thenApply(_4_CompletableFutures::getLinks);  linksCF.thenAccept(list -> {     assertThat(list, not(empty())); });  linksCF.get(); 

If, in my thenAccept call, the assertion fails, the exception is not propagated. I tried something even uglier then:

linksCF.thenAccept(list -> {     String a = null;     System.out.println(a.toString()); }); 

nothing happens, no exception is propagated. I tried using methods like handle and others related to exceptions in CompletableFutures, but failed - none is propagating the exception as expected.

When I debugged the CompletableFuture, it does catch the exception like this:

final void internalComplete(T v, Throwable ex) {     if (result == null)         UNSAFE.compareAndSwapObject             (this, RESULT, null,              (ex == null) ? (v == null) ? NIL : v :              new AltResult((ex instanceof CompletionException) ? ex :                            new CompletionException(ex)));     postComplete(); // help out even if not triggered } 

and nothing else.

I'm on JDK 1.8.0_05 x64, Windows 7.

Am I missing something here?

like image 496
maciej Avatar asked Jun 12 '14 11:06

maciej


People also ask

How do you handle exceptions in runAsync?

runAsync(() -> { //process and throw exception }, anInstanceOfTaskExecutor ) . thenRun(() -> {}) . exceptionally(exception -> { // do something, handle exception }) )); In this case, it will execute thenRun .

What is completedStage () method in CompletableFuture interface?

completedFuture​(U value) Returns a new CompletableFuture that is already completed with the given value. static <U> CompletionStage<U> completedStage​(U value) Returns a new CompletionStage that is already completed with the given value and supports only those methods in interface CompletionStage .

How does CompletableFuture supplyAsync work?

CompletableFuture executes these tasks in a thread obtained from the global ForkJoinPool. commonPool(). But hey, you can also create a Thread Pool and pass it to runAsync() and supplyAsync() methods to let them execute their tasks in a thread obtained from your thread pool.

What does CompletableFuture return?

The most generic way to process the result of a computation is to feed it to a function. The thenApply method does exactly that; it accepts a Function instance, uses it to process the result, and returns a Future that holds a value returned by a function: CompletableFuture<String> completableFuture = CompletableFuture.


2 Answers

The problem is you never request to receive the results of your call to linksCF.thenAccept(..).

Your call to linksCF.get() will wait for the results of the execution in your chain. But it will only return the results of then linksCF future. This doesn't include the results of your assertion.

linksCF.thenAccept(..) will return a new CompletableFuture instance. To get the exception thrown call get() or check the exception status with isCompletedExceptionally() on the newly return CompletableFuture instance.

CompletableFuture<Void> acceptedCF = linksCF.thenAccept(list -> {     assertThat(list, not(empty())); });  acceptedCF.exceptionally(th -> {     // will be executed when there is an exception.     System.out.println(th);     return null; }); acceptedCF.get(); // will throw ExecutionException once results are available 

Alternative?

CompletableFuture<List<String>> appliedCF = linksCF.thenApply(list -> {     assertThat(list, not(empty()));     return list; });  appliedCF.exceptionally(th -> {     // will be executed when there is an exception.     System.out.println(th);     return Coolections.emptyList(); }); appliedCF.get(); // will throw ExecutionException once results are available 
like image 198
Gregor Koukkoullis Avatar answered Oct 16 '22 14:10

Gregor Koukkoullis


Although the question is basically already answered by Gregor Koukkoullis (+1), here is a MCVE that I created to test this.

There are several options for obtaining the actual exception that caused the problem internally. However, I don't see why calling get on the future that is returned by thenAccept should be an issue. In doubt, you could also use thenApply with the identity function and use a nice fluent pattern, like in

List<String> list =      readPage().     thenApply(CompletableFutureTest::getLinks).     thenApply(t -> {         // check assertion here         return t;     }).get(); 

But maybe there's a particular reason why you want to avoid this.

import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.function.Supplier;  public class CompletableFutureTest {     public static void main(String[] args)          throws InterruptedException, ExecutionException     {         CompletableFuture<String> contentsCF = readPage();         CompletableFuture<List<String>> linksCF =              contentsCF.thenApply(CompletableFutureTest::getLinks);          CompletableFuture<Void> completionStage = linksCF.thenAccept(list ->          {             String a = null;             System.out.println(a.toString());         });                  // This will NOT cause an exception to be thrown, because         // the part that was passed to "thenAccept" will NOT be         // evaluated (it will be executed, but the exception will         // not show up)         List<String> result = linksCF.get();         System.out.println("Got "+result);           // This will cause the exception to be thrown and         // wrapped into an ExecutionException. The cause         // of this ExecutionException can be obtained:         try         {             completionStage.get();         }         catch (ExecutionException e)         {             System.out.println("Caught "+e);             Throwable cause = e.getCause();             System.out.println("cause: "+cause);         }          // Alternatively, the exception may be handled by         // the future directly:         completionStage.exceptionally(e ->          {              System.out.println("Future exceptionally finished: "+e);             return null;          });          try         {             completionStage.get();         }         catch (Throwable t)         {             System.out.println("Already handled by the future "+t);         }      }      private static List<String> getLinks(String s)     {         System.out.println("Getting links...");         List<String> links = new ArrayList<String>();         for (int i=0; i<10; i++)         {             links.add("link"+i);         }         dummySleep(1000);         return links;     }      private static CompletableFuture<String> readPage()     {         return CompletableFuture.supplyAsync(new Supplier<String>()          {             @Override             public String get()              {                 System.out.println("Getting page...");                 dummySleep(1000);                 return "page";             }         });     }      private static void dummySleep(int ms)     {         try         {             Thread.sleep(ms);         }         catch (InterruptedException e)         {             e.printStackTrace();             Thread.currentThread().interrupt();         }     } } 
like image 25
Marco13 Avatar answered Oct 16 '22 14:10

Marco13