Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

CompletableFuture allof(..).join() vs CompletableFuture.join()

I am currently using CompletableFuture supplyAsync() method for submitting some tasks to common thread pool. Here is what code snippet looks like:

final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
        .map(resolver -> supplyAsync(() -> task.doWork()))
        .collect(toList());

CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()])).join();

final List<Test> tests = new ArrayList<>();
completableFutures.stream()
        .map(completableFuture -> completableFuture.getNow())
        .forEach(tests::addAll);

I would like to know how below differs from above code. I removed the parent completableFuture from below code, and added join() for each of the completableFuture instead of getNow():

final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
        .map(resolver -> supplyAsync(() -> task.doWork()))
        .collect(toList());

final List<Test> tests = new ArrayList<>();
completableFutures.stream()
        .map(completableFuture -> completableFuture.join())
        .forEach(tests::addAll);

I am using this in the spring service and there are issues with thread pool exhaustion. Any pointers is deeply appreciated.

like image 535
G Shenoy Avatar asked Sep 17 '18 15:09

G Shenoy


People also ask

What is join in CompletableFuture?

The CompletableFuture. join() method is similar to the get method, but it throws an unchecked exception in case the Future does not complete normally. This makes it possible to use it as a method reference in the Stream. map() method.

What is CompletableFuture allOf?

allOf() is a static method of the CompletableFuture class. It returns a new CompletableFuture object when all of the specified CompletableFutures are complete.

Is join on CompletableFuture blocking?

The answer is that join() may block the thread, but if this happens inside a worker thread of a Fork/Join pool, this situation will be detected and a new compensation thread will be started, to ensure the configured target parallelism.

What is the difference between runAsync and supplyAsync?

The difference between runAsync() and supplyAsync() is that the former returns a Void while supplyAsync() returns a value obtained by the Supplier. Both methods also support a second input argument — a custom Executor to submit tasks to.


1 Answers

First of all, .getNow() does not work, as this method requires a fall-back value as argument for the case the future is not completed yet. Since you are assuming the future to be completed here, you should also use join().

Then, there is no difference regarding thread exhaustion as in either case, you are waiting for the completion of all jobs before proceeding, potentially blocking the current thread.

The only way to avoid that, is by refactoring the code to not expect a result synchronously, but rather schedule the subsequent processing action to do done, when all jobs have been completed. Then, using allOf becomes relevant:

final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
    .map(resolver -> supplyAsync(() -> task.doWork()))
    .collect(toList());

CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0]))
    .thenAccept(justVoid -> {
        // here, all jobs have been completed
        final List<Test> tests = completableFutures.stream()
            .flatMap(completableFuture -> completableFuture.join().stream())
            .collect(toList());
        // process the result here
    });

By the way, regarding the toArray method on collections, I recommended reading Arrays of Wisdom of the Ancients…

like image 138
Holger Avatar answered Sep 20 '22 13:09

Holger