Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does thenComposeAsync await the return to be redeemable

I have written up a contrived code example, and it might not be code that someone ought to use, but I believe it should work. However it instead deadlocks. I've read the answers described here, but found them insufficient.

Here is the code example:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class Test {

    public static void main(String argv[]) throws Exception {
        
        int nThreads = 1;
        Executor executor = Executors.newFixedThreadPool( nThreads );
        

        
        CompletableFuture.completedFuture(true)
            .thenComposeAsync((unused)->{
            
                System.err.println("About to enqueue task");
                CompletableFuture<Boolean> innerFuture = new CompletableFuture<>();
                executor.execute(() -> {

                    // pretend this is some really expensive computation done asynchronously

                    System.err.println("Inner task");
                    innerFuture.complete(true);
                });
                System.err.println("Task enqueued");
                             
                return innerFuture;
            }, executor).get();

        System.err.println("All done");
        System.exit(0);

    }
    
}

This prints:

About to enqueue task

Task enqueued

And then it hangs. It's deadlocked because the executor only has a single thread, and it's waiting for the innerFuture to become redeemable. Why does "thenComposeAsync" block for its return value to become redeemable, instead of returning the still-incomplete future and freeing up its thread in the executor?

This feels completely unintuitive, and the javadocs don't really help. Am I fundamentally misunderstanding how CompletionStages work? Or is this a bug in the implementation?

like image 890
sethwm Avatar asked Dec 18 '14 05:12

sethwm


People also ask

What is Completable future?

What's a CompletableFuture? CompletableFuture is used for asynchronous programming in Java. Asynchronous programming is a means of writing non-blocking code by running a task on a separate thread than the main application thread and notifying the main thread about its progress, completion or failure.

What does CompletableFuture runAsync do?

runAsync. Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor after it runs the given action.

What is thenAcceptAsync?

thenAcceptAsync() is an instance method in Java. It is used when we don't want to return anything from our callback function and only want to run some code once a Future completes. The thenAcceptAsync() method has access to the result of the CompletableFuture on which it is attached.


2 Answers

First, let me rewrite your code with 2 static functions to make it easier to see what's going on:

// Make an executor equivalent to Executors.newFixedThreadPool(nThreads)
// that will trace to standard error when a task begins or ends
static ExecutorService loggingExecutor(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>()) {

                @Override
                protected void beforeExecute(Thread t, Runnable r) {
                    System.err.println("Executor beginning task on thread: " 
                       + t.getName());
                }

                @Override
                protected void afterExecute(Runnable r, Throwable t) {
                    System.err.println("Executor finishing task on thread: "
                       + Thread.currentThread().getName());
                }

            };
}

And

// same as what you pass to thenComposeAsync
static Function<Boolean, CompletableFuture<Boolean>> inner(Executor executor) {
    return b -> {
        System.err.println(Thread.currentThread().getName() 
                   + ": About to enqueue task");
        CompletableFuture<Boolean> innerFuture = new CompletableFuture<>();
        executor.execute(() -> {
            System.err.println(Thread.currentThread().getName() 
                   + ": Inner task");
            innerFuture.complete(true);
        });
        System.err.println(Thread.currentThread().getName() 
                   + ": Task enqueued");

        return innerFuture;
    };
}

Now we can write your test case as follows:

ExecutorService e = loggingExecutor(1);

CompletableFuture.completedFuture(true)
        .thenComposeAsync(inner(e), e)
        .join();

e.shutdown();

/* Output before deadlock:
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
*/

Let's test your conclusion that the first thread is not released until the result of the second future is computed:

ExecutorService e = loggingExecutor(2);  // use 2 threads this time

CompletableFuture.completedFuture(true)
        .thenComposeAsync(inner(e), e)
        .join();

e.shutdown();

/*
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
Executor beginning task on thread: pool-1-thread-2
pool-1-thread-2: Inner task
Executor finishing task on thread: pool-1-thread-2
Executor finishing task on thread: pool-1-thread-1
*/

Indeed, it appears that thread 1 is held until thread 2 is done

Let's see if you are right that thenComposeAsync itself blocks:

ExecutorService e = loggingExecutor(1);

CompletableFuture<Boolean> future = 
        CompletableFuture.completedFuture(true)
        .thenComposeAsync(inner(e), e);

System.err.println("thenComposeAsync returned");

future.join();

e.shutdown();

/*
thenComposeAsync returned
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
*/

thenComposeAsync didn't block. It returned the CompletableFuture right away and the deadlock only occurred when we tried to complete it. So what should it take to complete the future returned by .thenComposeAsync(inner(e), e)?

  1. The API needs to wait for innner(e) to return CompletableFuture<Boolean>
  2. it needs to wait for the returned CompletableFuture<Boolean> to also complete. Only then is the future complete. So, as you can see, it cannot do what you suggest and return the incomplete Future.

Is it a bug? Why does the CompletionStage hold on to thread 1 while the inner task is being computed? It is not a bug becuase, as you noted, the documentation is pretty vague and doesn't promise to release threads in any particular order. Also, note that Thread1 will be used for any subsequent then*() methods of CompletableFuture. Consider the following:

ExecutorService e = loggingExecutor(2);

CompletableFuture.completedFuture(true)
        .thenComposeAsync(inner(e), e)
        .thenRun(() -> System.err.println(Thread.currentThread().getName()
                         + ": All done"))
        .join();

e.shutdown();

/*
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
Executor beginning task on thread: pool-1-thread-2
pool-1-thread-2: Inner task
Executor finishing task on thread: pool-1-thread-2
pool-1-thread-1: All done
Executor finishing task on thread: pool-1-thread-1
*/

As you can see, .thenRun(...) got executed on thread 1. I believe this is consistent with other *Async(... , Executor exec) methods of CompletableFuture.

But what if you want to split up the functionality of thenComposeAsync into 2 separately controllable steps instead of leaving it up to the API to juggle threads? You can just do this:

ExecutorService e = loggingExecutor(1);

completedFuture(true)
        .thenApplyAsync(inner(e), e) // do the async part first
        .thenCompose(x -> x)         // compose separately
        .thenRun(() -> System.err.println(Thread.currentThread().getName()
                        + ": All done"))
        .join();

e.shutdown();

Everything will run nicely on 1 thread with no deadlocks.

In conclusion, is this behavior of unintuitive as you say? I don't know. I cannot imagine why thenComposeAsync even exists. If a method returns CompletableFuture, it shouldn't block and there should be no reason to call it asynchronously.

like image 131
Misha Avatar answered Sep 19 '22 17:09

Misha


So, after a lot of interesting conversation, I decided to email one of the JDK authors. Found out that this behavior wasn't intended, and is indeed a bug present in 1.8u25. There is a fix that will be released with a later patch version of Java 8. I don't know which. For anyone wanting to test the new behavior, you can download the latest jsr166 jar here:

http://gee.cs.oswego.edu/dl/concurrency-interest/index.html

like image 33
sethwm Avatar answered Sep 21 '22 17:09

sethwm