Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to Fan Out Inside Chained CompletableFuture?

I want to chain a CompletableFuture such that it fans out in the middle of processing. By this I mean I have an open CompletableFuture against a list, and I want to apply a computation against each item in that list.

The first step is to call m_myApi.getResponse(request, executor) which issues an async call.

The result of that async call has a getCandidates method. I want to parse all of those candidates in parallel.

Currently, my code parses them all serially

public CompletableFuture<List<DOMAIN_OBJECT>> parseAllCandidates(@Nonnull final REQUEST request, @Nonnull final Executor executor)
{
        CompletableFuture<RESPONSE> candidates = m_myApi.getResponse(request, executor);
        return candidates.thenApplyAsync(response -> response.getCandidates()
                                                   .stream()
                                                   .map(MyParser::ParseCandidates)
                                                   .collect(Collectors.toList()));
}

I want something like this:

public CompletableFuture<List<DOMAIN_OBJECT>> parseAllCandidates(@Nonnull final REQUEST request, @Nonnull final Executor executor)
{
        CompletableFuture<RESPONSE> candidates = m_myApi.getResponse(request, executor);
        return candidates.thenApplyAsync(response -> response.getCandidates()
                                                   .stream()
                                                   .PARSE_IN_PARALLEL_USING_EXECUTOR
}
like image 629
user3899879 Avatar asked Oct 25 '18 21:10

user3899879


1 Answers

As said in this answer, if the Executor happens to be a Fork/Join pool, there is the (undocumented) feature that commencing a parallel stream in one of its worker threads will perform the parallel operation using that executor.

When you want to support arbitrary Executor implementations, things are more complicated. One solution looks like

public CompletableFuture<List<DOMAIN_OBJECT>> parseAllCandidates(
       @Nonnull final REQUEST request, @Nonnull final Executor executor)
{
    CompletableFuture<RESPONSE> candidates = m_myApi.getResponse(request, executor);
    return candidates.thenComposeAsync(
        response -> {
            List<CompletableFuture<DOMAIN_OBJECT>> list = response.getCandidates()
                .stream()
                .map(CompletableFuture::completedFuture)
                .map(f -> f.thenApplyAsync(MyParser::ParseCandidates, executor))
                .collect(Collectors.toList());
            return CompletableFuture.allOf(list.toArray(new CompletableFuture<?>[0]))
                .thenApplyAsync(x ->
                    list.stream().map(CompletableFuture::join).collect(Collectors.toList()),
                    executor);
        },
        executor);
}

The first crucial thing is that we have to submit all potentially asynchronous jobs before starting to wait on any, to enable the maximum parallelism the executor might support. Hence, we have to collect all futures in a List in a first step.

In the second step, we could just iterate over the list and join all futures. If the executor is a Fork/Join pool and the future has not completed yet, it would detect this and start a compensation thread to regain the configured parallelism. However, for arbitrary executors, we can not assume such a feature. Most notable, if the executor is a single thread executor, this could lead to a deadlock.

Therefore, the solution uses CompletableFuture.allOf to perform the operation of iterating and joining all futures only when all of them have been completed already. Therefore, this solution will never block an executor’s thread, making it compatible with any Executor implementation.

like image 184
Holger Avatar answered Oct 12 '22 22:10

Holger