Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use CompletableFuture.thenComposeAsync()?

Given:

public class Test
{
    public static void main(String[] args)
    {
        int nThreads = 1;
        Executor e = Executors.newFixedThreadPool(nThreads);

        CompletableFuture.runAsync(() ->
        {
            System.out.println("Task 1. Thread: " + Thread.currentThread().getId());
        }, e).thenComposeAsync((Void unused) ->
        {
            return CompletableFuture.runAsync(() ->
            {
                System.out.println("Task 2. Thread: " + Thread.currentThread().getId());
            }, e);
        }, e).join();
        System.out.println("finished");
    }
}

I am expecting a single executor thread to run task 1, followed by task 2. Instead, the code hangs if nThreads is less than 2.

  1. Please explain why the code hangs. I can see it is blocked in CompletableFuture:616 waiting for some Future to complete, but it's not clear why.
  2. If I allow the use of 2 threads, what is each thread being used for?

In short, please help me understand how thenComposeAsync() actually works. The Javadoc looks like it was written for robots instead of human beings :)

like image 237
Gili Avatar asked Oct 26 '14 08:10

Gili


People also ask

How to use Java completablefuture supplyasync with examples?

Java CompletableFuture supplyAsync Tutorial with Examples 1 Transform the result with thenApplyAsync callbacks. ... 2 Combine the results with thenComposeAsync and thenCombineAsync. ... 3 Await completion of independent CompletableFutures. ... 4 Complete the stage with thenAcceptAsync and thenRunAsync. ... 5 Conclusion. ...

What is the use of thencomposeasync() method?

The thenComposeAsync method places a new task for your executor that grabs the single thread and waits for your Task 2 to complete. But this one has no more threads to run.

How do I use the completablefuture get() method?

The completableFuture.get () blocks until the completableFuture is complete and return the result Apart from using get (), you can also use its result to continuously execute other methods in the callbacks chain Let's walk through this tutorial to see the examples in practice

How is completablefuture executed asynchronously?

A CompletableFuture is executed asynchronously when the method typically ends with the keyword Async. By default (when no Executor is specified), asynchronous execution uses the common ForkJoinPool implementation, which uses daemon threads to execute the Runnable task. Note that this is specific to CompletableFuture.


2 Answers

  1. The thenComposeAsync method places a new task for your executor that grabs the single thread and waits for your Task 2 to complete. But this one has no more threads to run. You can instead use thenCompose method that executes in the same thread as Task 1 to avoid the deadlock.

  2. One thread is executing Task 1 and Task 2 and the second one is taking care of composing the results of the two.

Note: CompletableFuture(s) work best with a ForkJoinPool that is more efficient in processing tasks that spawn new tasks. The default ForkJoinPool was added in Java 8 for this purpose and is used by default if you don't specify an executor to run your tasks.

Here is a good presentation about where these new features shines and how they work: Reactive Programming Patterns with Java 8 Futures.

like image 115
dcernahoschi Avatar answered Oct 03 '22 18:10

dcernahoschi


It blocks on runAsync that's inside thenComposeAsync. thenComposeAsync runs the supplied function in a thread inside executor e. But the function you gave it tries itself to execute the body of runAsync inside the same executor.

You can see better what's going on by adding another trace output:

CompletableFuture.runAsync(() -> {
    System.out.println("Task 1. Thread: " + Thread.currentThread().getId());
}, e).thenComposeAsync((Void unused) -> {
    System.out.println("Task 1 1/2. Thread: " + Thread.currentThread().getId());
    return CompletableFuture.runAsync(() -> {
        System.out.println("Task 2. Thread: " + Thread.currentThread().getId());
    }, e);
}, e).join();

Now if you run it with a 2-thread executor, you will see that Task 1 1/2 and Task 2 run on different threads.

The way to fix it is to replace thenComposeAsync with just regular thenCompose. I am not sure why you would ever use thenComposeAsync. If you have a method that returns a CompletableFuture, presumably that method doesn't block and doesn't need to be run asynchronously.

like image 30
Misha Avatar answered Oct 03 '22 17:10

Misha