This code is a quoted from Java 8 in Action, which is also in the book 11.4.3.
public Stream<CompletableFuture<String>> findPricesStream(String product) {
return shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)));
}
Along the code, the writer enclose a figure as follows expressing that the applyDiscount()
works in the same thread with getPrice()
, which I strongly have a doubt: there are two different Async suffix here which means the second call should be in another thread.
I tested it locally with the following code:
private static void testBasic() {
out.println("*****************************************");
out.println("********** TESTING thenCompose **********");
CompletableFuture[] futures = IntStream.rangeClosed(0, LEN).boxed()
.map(i -> CompletableFuture.supplyAsync(() -> runStage1(i), EXECUTOR_SERVICE))
.map(future -> future.thenCompose(i -> CompletableFuture.supplyAsync(() -> runStage2(i), EXECUTOR_SERVICE)))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();
}
The output further demonstrate my thought, is it correct?
*****************************************
********** TESTING thenCompose **********
Start: stage - 1 - value: 0 - thread name: pool-1-thread-1
Start: stage - 1 - value: 1 - thread name: pool-1-thread-2
Start: stage - 1 - value: 2 - thread name: pool-1-thread-3
Start: stage - 1 - value: 3 - thread name: pool-1-thread-4
Finish: stage - 1 - value: 3 - thread name: pool-1-thread-4 - time cost: 1520
Start: stage - 2 - value: 3 - thread name: pool-1-thread-5
Finish: stage - 1 - value: 0 - thread name: pool-1-thread-1 - time cost: 1736
Start: stage - 2 - value: 0 - thread name: pool-1-thread-6
Finish: stage - 1 - value: 2 - thread name: pool-1-thread-3 - time cost: 1761
Start: stage - 2 - value: 2 - thread name: pool-1-thread-7
Finish: stage - 2 - value: 2 - thread name: pool-1-thread-7 - time cost: 446
Finish: stage - 1 - value: 1 - thread name: pool-1-thread-2 - time cost: 2249
Start: stage - 2 - value: 1 - thread name: pool-1-thread-8
Finish: stage - 2 - value: 3 - thread name: pool-1-thread-5 - time cost: 828
Finish: stage - 2 - value: 0 - thread name: pool-1-thread-6 - time cost: 704
Finish: stage - 2 - value: 1 - thread name: pool-1-thread-8 - time cost: 401
The Java 8 in Action
is wrong about this?
Thank you, @Holger. You make it crystal clear to me now about the executing thread for async and non-async methods. Especially after checking its specification further demonstrating your point.
Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.
As a first note, that code is distracting from what’s happening due to the unnecessary splitting into multiple Stream operations.
Further, there is no sense in doing
future.thenCompose(quote ->
CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor))
instead of
future.thenApplyAsync(quote -> Discount.applyDiscount(quote), executor)
So, a simpler example doing the same would be
public Stream<CompletableFuture<String>> findPricesStream(String product) {
return shops.stream().map(
shop -> CompletableFuture
.supplyAsync(() -> shop.getPrice(product), executor)
.thenApply(Quote::parse)
.thenApplyAsync(quote -> Discount.applyDiscount(quote), executor));
}
However, you are right, there is no guaranty that getPrice
and applyDiscount
run in the same thread—unless the executor is a single threaded executor.
You may interpret “executor thread” as “one of the executor’s threads”, but even then, there in a dangerously wrong point in the diagram, namely, “new Quote(price)
”, which apparently actually means “Quote::parse
”. That step does not belong to the right side, as the actual thread evaluating the function passed to thenApply
is unspecified. It may be one of the executor’s threads upon completion of the previous stage, but it may also be “your thread” right when calling thenApply
, e.g. if the asynchronous operation managed to complete in‑between.
The CompletableFuture
offers no way to enforce the use of the first stage’s completing thread for the dependent actions.
Unless you use a simple sequential code instead, of course:
public Stream<CompletableFuture<String>> findPricesStream(String product) {
return shops.stream().map(shop -> CompletableFuture
.supplyAsync(() -> Discount.applyDiscount(Quote.parse(shop.getPrice(product))), executor));
}
Then, the picture of a linear thread on the right hand side will be correct.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With