Given:
public class Test
{
public static void main(String[] args)
{
int nThreads = 1;
Executor e = Executors.newFixedThreadPool(nThreads);
CompletableFuture.runAsync(() ->
{
System.out.println("Task 1. Thread: " + Thread.currentThread().getId());
}, e).thenComposeAsync((Void unused) ->
{
return CompletableFuture.runAsync(() ->
{
System.out.println("Task 2. Thread: " + Thread.currentThread().getId());
}, e);
}, e).join();
System.out.println("finished");
}
}
I am expecting a single executor thread to run task 1, followed by task 2. Instead, the code hangs if nThreads
is less than 2.
Future
to complete, but it's not clear why.In short, please help me understand how thenComposeAsync()
actually works. The Javadoc looks like it was written for robots instead of human beings :)
Java CompletableFuture supplyAsync Tutorial with Examples 1 Transform the result with thenApplyAsync callbacks. ... 2 Combine the results with thenComposeAsync and thenCombineAsync. ... 3 Await completion of independent CompletableFutures. ... 4 Complete the stage with thenAcceptAsync and thenRunAsync. ... 5 Conclusion. ...
The thenComposeAsync method places a new task for your executor that grabs the single thread and waits for your Task 2 to complete. But this one has no more threads to run.
The completableFuture.get () blocks until the completableFuture is complete and return the result Apart from using get (), you can also use its result to continuously execute other methods in the callbacks chain Let's walk through this tutorial to see the examples in practice
A CompletableFuture is executed asynchronously when the method typically ends with the keyword Async. By default (when no Executor is specified), asynchronous execution uses the common ForkJoinPool implementation, which uses daemon threads to execute the Runnable task. Note that this is specific to CompletableFuture.
The thenComposeAsync
method places a new task for your executor that grabs the single thread and waits for your Task 2
to complete. But this one has no more threads to run. You can instead use thenCompose
method that executes in the same thread as Task 1
to avoid the deadlock.
One thread is executing Task 1
and Task 2
and the second one is taking care of composing the results of the two.
Note: CompletableFuture
(s) work best with a ForkJoinPool
that is more efficient in processing tasks that spawn new tasks. The default ForkJoinPool
was added in Java 8 for this purpose and is used by default if you don't specify an executor to run your tasks.
Here is a good presentation about where these new features shines and how they work: Reactive Programming Patterns with Java 8 Futures.
It blocks on runAsync
that's inside thenComposeAsync
. thenComposeAsync
runs the supplied function in a thread inside executor e. But the function you gave it tries itself to execute the body of runAsync inside the same executor.
You can see better what's going on by adding another trace output:
CompletableFuture.runAsync(() -> {
System.out.println("Task 1. Thread: " + Thread.currentThread().getId());
}, e).thenComposeAsync((Void unused) -> {
System.out.println("Task 1 1/2. Thread: " + Thread.currentThread().getId());
return CompletableFuture.runAsync(() -> {
System.out.println("Task 2. Thread: " + Thread.currentThread().getId());
}, e);
}, e).join();
Now if you run it with a 2-thread executor, you will see that Task 1 1/2 and Task 2 run on different threads.
The way to fix it is to replace thenComposeAsync
with just regular thenCompose
. I am not sure why you would ever use thenComposeAsync
. If you have a method that returns a CompletableFuture
, presumably that method doesn't block and doesn't need to be run asynchronously.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With