I want to chain a CompletableFuture such that it fans out in the middle of processing. By this I mean I have an open CompletableFuture against a list, and I want to apply a computation against each item in that list.
The first step is to call m_myApi.getResponse(request, executor) which issues an async call.
The result of that async call has a getCandidates method. I want to parse all of those candidates in parallel.
Currently, my code parses them all serially
public CompletableFuture<List<DOMAIN_OBJECT>> parseAllCandidates(@Nonnull final REQUEST request, @Nonnull final Executor executor)
{
CompletableFuture<RESPONSE> candidates = m_myApi.getResponse(request, executor);
return candidates.thenApplyAsync(response -> response.getCandidates()
.stream()
.map(MyParser::ParseCandidates)
.collect(Collectors.toList()));
}
I want something like this:
public CompletableFuture<List<DOMAIN_OBJECT>> parseAllCandidates(@Nonnull final REQUEST request, @Nonnull final Executor executor)
{
CompletableFuture<RESPONSE> candidates = m_myApi.getResponse(request, executor);
return candidates.thenApplyAsync(response -> response.getCandidates()
.stream()
.PARSE_IN_PARALLEL_USING_EXECUTOR
}
As said in this answer, if the Executor
happens to be a Fork/Join pool, there is the (undocumented) feature that commencing a parallel stream in one of its worker threads will perform the parallel operation using that executor.
When you want to support arbitrary Executor
implementations, things are more complicated. One solution looks like
public CompletableFuture<List<DOMAIN_OBJECT>> parseAllCandidates(
@Nonnull final REQUEST request, @Nonnull final Executor executor)
{
CompletableFuture<RESPONSE> candidates = m_myApi.getResponse(request, executor);
return candidates.thenComposeAsync(
response -> {
List<CompletableFuture<DOMAIN_OBJECT>> list = response.getCandidates()
.stream()
.map(CompletableFuture::completedFuture)
.map(f -> f.thenApplyAsync(MyParser::ParseCandidates, executor))
.collect(Collectors.toList());
return CompletableFuture.allOf(list.toArray(new CompletableFuture<?>[0]))
.thenApplyAsync(x ->
list.stream().map(CompletableFuture::join).collect(Collectors.toList()),
executor);
},
executor);
}
The first crucial thing is that we have to submit all potentially asynchronous jobs before starting to wait on any, to enable the maximum parallelism the executor might support. Hence, we have to collect all futures in a List
in a first step.
In the second step, we could just iterate over the list and join
all futures. If the executor is a Fork/Join pool and the future has not completed yet, it would detect this and start a compensation thread to regain the configured parallelism. However, for arbitrary executors, we can not assume such a feature. Most notable, if the executor is a single thread executor, this could lead to a deadlock.
Therefore, the solution uses CompletableFuture.allOf
to perform the operation of iterating and joining all futures only when all of them have been completed already. Therefore, this solution will never block an executor’s thread, making it compatible with any Executor
implementation.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With