I have written up a contrived code example, and it might not be code that someone ought to use, but I believe it should work. However it instead deadlocks. I've read the answers described here, but found them insufficient.
Here is the code example:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class Test {
public static void main(String argv[]) throws Exception {
int nThreads = 1;
Executor executor = Executors.newFixedThreadPool( nThreads );
CompletableFuture.completedFuture(true)
.thenComposeAsync((unused)->{
System.err.println("About to enqueue task");
CompletableFuture<Boolean> innerFuture = new CompletableFuture<>();
executor.execute(() -> {
// pretend this is some really expensive computation done asynchronously
System.err.println("Inner task");
innerFuture.complete(true);
});
System.err.println("Task enqueued");
return innerFuture;
}, executor).get();
System.err.println("All done");
System.exit(0);
}
}
This prints:
About to enqueue task
Task enqueued
And then it hangs. It's deadlocked because the executor only has a single thread, and it's waiting for the innerFuture to become redeemable. Why does "thenComposeAsync" block for its return value to become redeemable, instead of returning the still-incomplete future and freeing up its thread in the executor?
This feels completely unintuitive, and the javadocs don't really help. Am I fundamentally misunderstanding how CompletionStages work? Or is this a bug in the implementation?
What's a CompletableFuture? CompletableFuture is used for asynchronous programming in Java. Asynchronous programming is a means of writing non-blocking code by running a task on a separate thread than the main application thread and notifying the main thread about its progress, completion or failure.
runAsync. Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor after it runs the given action.
thenAcceptAsync() is an instance method in Java. It is used when we don't want to return anything from our callback function and only want to run some code once a Future completes. The thenAcceptAsync() method has access to the result of the CompletableFuture on which it is attached.
First, let me rewrite your code with 2 static functions to make it easier to see what's going on:
// Make an executor equivalent to Executors.newFixedThreadPool(nThreads)
// that will trace to standard error when a task begins or ends
static ExecutorService loggingExecutor(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>()) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.err.println("Executor beginning task on thread: "
+ t.getName());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.err.println("Executor finishing task on thread: "
+ Thread.currentThread().getName());
}
};
}
And
// same as what you pass to thenComposeAsync
static Function<Boolean, CompletableFuture<Boolean>> inner(Executor executor) {
return b -> {
System.err.println(Thread.currentThread().getName()
+ ": About to enqueue task");
CompletableFuture<Boolean> innerFuture = new CompletableFuture<>();
executor.execute(() -> {
System.err.println(Thread.currentThread().getName()
+ ": Inner task");
innerFuture.complete(true);
});
System.err.println(Thread.currentThread().getName()
+ ": Task enqueued");
return innerFuture;
};
}
Now we can write your test case as follows:
ExecutorService e = loggingExecutor(1);
CompletableFuture.completedFuture(true)
.thenComposeAsync(inner(e), e)
.join();
e.shutdown();
/* Output before deadlock:
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
*/
Let's test your conclusion that the first thread is not released until the result of the second future is computed:
ExecutorService e = loggingExecutor(2); // use 2 threads this time
CompletableFuture.completedFuture(true)
.thenComposeAsync(inner(e), e)
.join();
e.shutdown();
/*
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
Executor beginning task on thread: pool-1-thread-2
pool-1-thread-2: Inner task
Executor finishing task on thread: pool-1-thread-2
Executor finishing task on thread: pool-1-thread-1
*/
Indeed, it appears that thread 1 is held until thread 2 is done
Let's see if you are right that thenComposeAsync
itself blocks:
ExecutorService e = loggingExecutor(1);
CompletableFuture<Boolean> future =
CompletableFuture.completedFuture(true)
.thenComposeAsync(inner(e), e);
System.err.println("thenComposeAsync returned");
future.join();
e.shutdown();
/*
thenComposeAsync returned
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
*/
thenComposeAsync
didn't block. It returned the CompletableFuture
right away and the deadlock only occurred when we tried to complete it. So what should it take to complete the future returned by .thenComposeAsync(inner(e), e)
?
CompletableFuture<Boolean>
CompletableFuture<Boolean>
to also complete. Only then is the future complete. So, as you can see, it cannot do what you suggest and return the incomplete Future. Is it a bug? Why does the CompletionStage hold on to thread 1 while the inner task is being computed? It is not a bug becuase, as you noted, the documentation is pretty vague and doesn't promise to release threads in any particular order. Also, note that Thread1 will be used for any subsequent then*()
methods of CompletableFuture. Consider the following:
ExecutorService e = loggingExecutor(2);
CompletableFuture.completedFuture(true)
.thenComposeAsync(inner(e), e)
.thenRun(() -> System.err.println(Thread.currentThread().getName()
+ ": All done"))
.join();
e.shutdown();
/*
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
Executor beginning task on thread: pool-1-thread-2
pool-1-thread-2: Inner task
Executor finishing task on thread: pool-1-thread-2
pool-1-thread-1: All done
Executor finishing task on thread: pool-1-thread-1
*/
As you can see, .thenRun(...) got executed on thread 1. I believe this is consistent with other *Async(... , Executor exec) methods of CompletableFuture.
But what if you want to split up the functionality of thenComposeAsync
into 2 separately controllable steps instead of leaving it up to the API to juggle threads? You can just do this:
ExecutorService e = loggingExecutor(1);
completedFuture(true)
.thenApplyAsync(inner(e), e) // do the async part first
.thenCompose(x -> x) // compose separately
.thenRun(() -> System.err.println(Thread.currentThread().getName()
+ ": All done"))
.join();
e.shutdown();
Everything will run nicely on 1 thread with no deadlocks.
In conclusion, is this behavior of unintuitive as you say? I don't know. I cannot imagine why thenComposeAsync
even exists. If a method returns CompletableFuture
, it shouldn't block and there should be no reason to call it asynchronously.
So, after a lot of interesting conversation, I decided to email one of the JDK authors. Found out that this behavior wasn't intended, and is indeed a bug present in 1.8u25. There is a fix that will be released with a later patch version of Java 8. I don't know which. For anyone wanting to test the new behavior, you can download the latest jsr166 jar here:
http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With