I have an input stream of queries which are getting executed asynchronously. I want to make sure that when I use Completablefuture::join
, the result of those requires are collected int the order of input query stream.
This is how my code looks like:
queries.stream()
.map(query -> CompletableFuture.supplyAsync(() -> {
try {
return SQLQueryEngine.execute(query);
} catch (InternalErrorException e) {
throw new RuntimeException(e);
}
}))
.map(CompletableFuture::join)
.collect(Collectors.toList());
SQLQueryEngine.execute(query); returns a List<Results>
so output is List<List<Result>
. I want to flatten and combine all the results into one single List. If i use .flatMap(List::stream) before collection to flatten, will it maintain the ordering?
You probably meant .flatMap
and yes, it will retain the ordering.
Consider explicitly passing an Executor
to supplyAsync
to avoid scheduling your IO-bound sql queries in ForkJoinPool.commonPool()
.
As @Ruben pointed out, you are joining each task in the current thread immediately after submitting it and before submitting the next query, which is likely a bug. You should submit all queries first and only then start joining.
You can do it like this (with static import of toList
):
queries.stream()
.map(query -> CompletableFuture.supplyAsync(...))
.collect(toList())
.stream()
.map(CompletableFuture::join)
.collect(toList());
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With