Following example code I'm injecting a biconsumer
which sleeps for 100 millis as a completion action of a set of completable future. I've used whenCompleteAsync
method by giving a separate executorService
to use. executorService
is a ThreadPoolExecutor
with core pool size 5, max size 5 and queue length of 1.
public class CompleteTest {
public static void main(String[] args) {
ExecutorService executorService = new ThreadPoolExecutor(5, 5, 10,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));
ArrayList<CompletableFuture<String>> list = new ArrayList<>();
for (int i = 0; i <100; i++) {
CompletableFuture<String> stringCompletableFuture = new CompletableFuture<>();
stringCompletableFuture.whenCompleteAsync((e, a) -> {
System.out.println("Complete " + e);
try {
Thread.sleep(100);
} catch (InterruptedException e1) {e1.printStackTrace();}
}, executorService);
list.add(stringCompletableFuture);
}
for (int i = 0; i < list.size(); i++) {
list.get(i).complete(i + "");
}
}
}
When I ran the code, even though I'm completing 100 futures only 6 outputs get printed. That is 5 core threads and 1 queued one. What happens to the rest? If other runnables could not submitted to the executor service because of the queue already full, shouldn't be there an exception.?
OutPut
Complete 0
Complete 1
Complete 2
Complete 3
Complete 4
Complete 5
An exception is thrown, and a CompletableFuture
is completed exceptionally, just not any of those you're tracking.
You're instantiating and initializing a ThreadPoolExecutor
with a constructor which uses a default RejectedExecutionHandler
that simply throws an exception. We know that a RejectedExecutionException
is thrown if an ExecutorService
cannot accept a task. So where is the task added and where is the exception thrown?
As it stands, all of the chaining happens within whenCompleteAsync
. When you call that, you add a dependent to the receiver CompletableFuture
, stringCompletableFuture
. When stringCompletableFuture
is completed (successfully, in this case), it will create a new CompletableFuture
(which it returns) and attempt to schedule the given BiConsumer
on the given ExecutorService
.
Since the ExecutorService
's queue has no space, it will invoke the RejectedExecutionHandler
which will throw RejectedExecutionException
. That exception is captured at that time and used to completeExceptionally
the CompletableFuture
that will be returned.
In other words, in your for
loop, capture the CompletableFuture
returned by whenCompleteAsync
, store it, and print out its state.
ArrayList<CompletableFuture<String>> list = new ArrayList<>();
ArrayList<CompletableFuture<?>> dependents = new ArrayList<>();
for (int i = 0; i <100; i++) {
CompletableFuture<String> stringCompletableFuture = new CompletableFuture<>();
CompletableFuture<?> thisWillHaveException = stringCompletableFuture.whenCompleteAsync((e, a) -> {
System.out.println("Complete " + e);
try {
Thread.sleep(100);
} catch (InterruptedException e1) {e1.printStackTrace();}
}, executorService);
dependents.add(thisWillHaveException);
list.add(stringCompletableFuture);
}
for (int i = 0; i < list.size(); i++) {
list.get(i).complete(i + "");
}
Thread.sleep(2000);
dependents.forEach(cf -> {
cf.whenComplete((r, e) -> {
if (e != null)
System.out.println(cf + " " + e.getMessage());
});
});
You'll notice that they are all (except the 6 that were successfully printed earlier) completed exceptionally with a RejectedExecutionException
.
...
java.util.concurrent.CompletableFuture@2d8e6db6[Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$UniWhenComplete@3f91beef rejected from java.util.concurrent.ThreadPoolExecutor@4eec7777[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
java.util.concurrent.CompletableFuture@23ab930d[Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$UniWhenComplete@1a6c5a9e rejected from java.util.concurrent.ThreadPoolExecutor@4eec7777[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
java.util.concurrent.CompletableFuture@4534b60d[Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$UniWhenComplete@37bba400 rejected from java.util.concurrent.ThreadPoolExecutor@4eec7777[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With