I am trying to convert List<CompletableFuture<X>>
to CompletableFuture<List<T>>
. This is quite useful as when you have many asynchronous tasks and you need to get results of all of them.
If any of them fails then the final future fails. This is how I have implemented:
public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) {
if(com.isEmpty()){
throw new IllegalArgumentException();
}
Stream<? extends CompletableFuture<T>> stream = com.stream();
CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>());
return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> {
x.add(y);
return x;
},exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> {
ls1.addAll(ls2);
return ls1;
},exec));
}
To run it:
ExecutorService executorService = Executors.newCachedThreadPool();
Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep((long) (Math.random() * 10));
} catch (InterruptedException e) {
e.printStackTrace();
}
return x;
}, executorService));
CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService);
If any of them fails then it fails. It gives output as expected even if there are a million futures. The problem I have is: Say if there are more than 5000 futures and if any of them fails, I get a StackOverflowError
:
Exception in thread "pool-1-thread-2611" java.lang.StackOverflowError at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210) at java.util.concurrent.CompletableFuture$ThenCompose.run(CompletableFuture.java:1487) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193) at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210) at java.util.concurrent.CompletableFuture$ThenCompose.run(CompletableFuture.java:1487)
What am I doing it wrong?
Note: The above returned future fails right when any of the future fails. The accepted answer should also take this point.
Use CompletableFuture.allOf(...)
:
static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}
A few comments on your implementation:
Your use of .thenComposeAsync
, .thenApplyAsync
and .thenCombineAsync
is likely not doing what you expect. These ...Async
methods run the function supplied to them in a separate thread. So, in your case, you are causing the addition of the new item to the list to run in the supplied executor. There is no need to stuff light-weight operations into a cached thread executor. Do not use thenXXXXAsync
methods without a good reason.
Additionally, reduce
should not be used to accumulate into mutable containers. Even though it might work correctly when the stream is sequential, it will fail if the stream were to be made parallel. To perform mutable reduction, use .collect
instead.
If you want to complete the entire computation exceptionally immediately after the first failure, do the following in your sequence
method:
CompletableFuture<List<T>> result = CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
com.forEach(f -> f.whenComplete((t, ex) -> {
if (ex != null) {
result.completeExceptionally(ex);
}
}));
return result;
If, additionally, you want to cancel the remaining operations on first failure, add exec.shutdownNow();
right after result.completeExceptionally(ex);
. This, of course, assumes that exec
only exist for this one computation. If it doesn't, you'll have to loop over and cancel each remaining Future
individually.
As Misha has pointed out, you are overusing …Async
operations. Further, you are composing a complex chain of operations modelling a dependency which doesn’t reflect your program logic:
Then, canceling (explicitly or due to an exception) this recursively composed job might be performed recursively and might fail with a StackOverflowError
. That’s implementation-dependent.
As already shown by Misha, there is a method, allOf
which allows you to model your original intention, to define one job which depends on all jobs of your list.
However, it’s worth noting that even that isn’t necessary. Since you are using an unbounded thread pool executor, you can simply post an asynchronous job collecting the results into a list and you are done. Waiting for the completion is implied by asking for the result of each job anyway.
ExecutorService executorService = Executors.newCachedThreadPool();
List<CompletableFuture<Integer>> que = IntStream.range(0, 100000)
.mapToObj(x -> CompletableFuture.supplyAsync(() -> {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos((long)(Math.random()*10)));
return x;
}, executorService)).collect(Collectors.toList());
CompletableFuture<List<Integer>> sequence = CompletableFuture.supplyAsync(
() -> que.stream().map(CompletableFuture::join).collect(Collectors.toList()),
executorService);
Using methods for composing dependent operations are important, when the number of threads is limited and the jobs may spawn additional asynchronous jobs, to avoid having waiting jobs stealing threads from jobs which have to complete first, but neither is the case here.
In this specific case one job simply iterating over this large number of prerequisite jobs and waiting if necessary may be more efficient than modelling this large number of dependencies and having each job to notify the dependent job about the completion.
You can get Spotify's CompletableFutures
library and use allAsList
method. I think it's inspired from Guava's Futures.allAsList
method.
public static <T> CompletableFuture<List<T>> allAsList(
List<? extends CompletionStage<? extends T>> stages) {
And here is a simple implementation if you don't want to use a library:
public <T> CompletableFuture<List<T>> allAsList(final List<CompletableFuture<T>> futures) {
return CompletableFuture.allOf(
futures.toArray(new CompletableFuture[futures.size()])
).thenApply(ignored ->
futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
);
}
To add upto the accepted answer by @Misha, it can be further expanded as a collector:
public static <T> Collector<CompletableFuture<T>, ?, CompletableFuture<List<T>>> sequenceCollector() {
return Collectors.collectingAndThen(Collectors.toList(), com -> sequence(com));
}
Now you can:
Stream<CompletableFuture<Integer>> stream = Stream.of(
CompletableFuture.completedFuture(1),
CompletableFuture.completedFuture(2),
CompletableFuture.completedFuture(3)
);
CompletableFuture<List<Integer>> ans = stream.collect(sequenceCollector());
An example sequence operation using thenCombine on CompletableFuture
public<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com){
CompletableFuture<List<T>> identity = CompletableFuture.completedFuture(new ArrayList<T>());
BiFunction<CompletableFuture<List<T>>,CompletableFuture<T>,CompletableFuture<List<T>>> combineToList =
(acc,next) -> acc.thenCombine(next,(a,b) -> { a.add(b); return a;});
BinaryOperator<CompletableFuture<List<T>>> combineLists = (a,b)-> a.thenCombine(b,(l1,l2)-> { l1.addAll(l2); return l1;}) ;
return com.stream()
.reduce(identity,
combineToList,
combineLists);
}
}
If you don't mind using 3rd party libraries cyclops-react (I am the author) has a set of utility methods for CompletableFutures (and Optionals, Streams etc)
List<CompletableFuture<String>> listOfFutures;
CompletableFuture<ListX<String>> sequence =CompletableFutures.sequence(listOfFutures);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With