Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I capture the RejectedExecutionException thrown in a CompletableFuture's whenCompleteAsync invocation?

Following example code I'm injecting a biconsumer which sleeps for 100 millis as a completion action of a set of completable future. I've used whenCompleteAsync method by giving a separate executorService to use. executorService is a ThreadPoolExecutor with core pool size 5, max size 5 and queue length of 1.

public class CompleteTest {
    public static void main(String[] args) {
        ExecutorService executorService = new ThreadPoolExecutor(5, 5, 10,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));

        ArrayList<CompletableFuture<String>> list = new ArrayList<>();

        for (int i = 0; i <100; i++) {
            CompletableFuture<String> stringCompletableFuture = new CompletableFuture<>();
            stringCompletableFuture.whenCompleteAsync((e, a) -> {
                System.out.println("Complete " + e);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e1) {e1.printStackTrace();}
            }, executorService);

            list.add(stringCompletableFuture);
        }

        for (int i = 0; i < list.size(); i++) {
            list.get(i).complete(i + "");
        }
    }
}

When I ran the code, even though I'm completing 100 futures only 6 outputs get printed. That is 5 core threads and 1 queued one. What happens to the rest? If other runnables could not submitted to the executor service because of the queue already full, shouldn't be there an exception.?

OutPut

Complete 0
Complete 1
Complete 2
Complete 3
Complete 4
Complete 5
like image 380
Sudheera Avatar asked Feb 21 '17 16:02

Sudheera


1 Answers

An exception is thrown, and a CompletableFuture is completed exceptionally, just not any of those you're tracking.

You're instantiating and initializing a ThreadPoolExecutor with a constructor which uses a default RejectedExecutionHandler that simply throws an exception. We know that a RejectedExecutionException is thrown if an ExecutorService cannot accept a task. So where is the task added and where is the exception thrown?

As it stands, all of the chaining happens within whenCompleteAsync. When you call that, you add a dependent to the receiver CompletableFuture, stringCompletableFuture. When stringCompletableFuture is completed (successfully, in this case), it will create a new CompletableFuture (which it returns) and attempt to schedule the given BiConsumer on the given ExecutorService.

Since the ExecutorService's queue has no space, it will invoke the RejectedExecutionHandler which will throw RejectedExecutionException. That exception is captured at that time and used to completeExceptionally the CompletableFuture that will be returned.

In other words, in your for loop, capture the CompletableFuture returned by whenCompleteAsync, store it, and print out its state.

ArrayList<CompletableFuture<String>> list = new ArrayList<>();
ArrayList<CompletableFuture<?>> dependents = new ArrayList<>();
for (int i = 0; i <100; i++) {
    CompletableFuture<String> stringCompletableFuture = new CompletableFuture<>();
    CompletableFuture<?> thisWillHaveException = stringCompletableFuture.whenCompleteAsync((e, a) -> {
        System.out.println("Complete " + e);
        try {
            Thread.sleep(100);
        } catch (InterruptedException e1) {e1.printStackTrace();}
    }, executorService);
    dependents.add(thisWillHaveException);
    list.add(stringCompletableFuture);
}

for (int i = 0; i < list.size(); i++) {
    list.get(i).complete(i + "");
}
Thread.sleep(2000);
dependents.forEach(cf -> {
    cf.whenComplete((r, e) -> {
        if (e != null)
            System.out.println(cf + " " + e.getMessage());
    });
});

You'll notice that they are all (except the 6 that were successfully printed earlier) completed exceptionally with a RejectedExecutionException.

...
java.util.concurrent.CompletableFuture@2d8e6db6[Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$UniWhenComplete@3f91beef rejected from java.util.concurrent.ThreadPoolExecutor@4eec7777[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
java.util.concurrent.CompletableFuture@23ab930d[Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$UniWhenComplete@1a6c5a9e rejected from java.util.concurrent.ThreadPoolExecutor@4eec7777[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
java.util.concurrent.CompletableFuture@4534b60d[Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$UniWhenComplete@37bba400 rejected from java.util.concurrent.ThreadPoolExecutor@4eec7777[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
like image 57
Sotirios Delimanolis Avatar answered Nov 04 '22 05:11

Sotirios Delimanolis