Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java 8 maintain stream order with CompletableFuture::join

I have an input stream of queries which are getting executed asynchronously. I want to make sure that when I use Completablefuture::join, the result of those requires are collected int the order of input query stream.

This is how my code looks like:

queries.stream()
     .map(query -> CompletableFuture.supplyAsync(() -> {
                    try {
                        return SQLQueryEngine.execute(query);
                    } catch (InternalErrorException e) {
                        throw new RuntimeException(e);
                    }
     }))
     .map(CompletableFuture::join)
     .collect(Collectors.toList());

SQLQueryEngine.execute(query); returns a List<Results> so output is List<List<Result>. I want to flatten and combine all the results into one single List. If i use .flatMap(List::stream) before collection to flatten, will it maintain the ordering?

like image 314
premprakash Avatar asked Feb 09 '23 17:02

premprakash


1 Answers

You probably meant .flatMap and yes, it will retain the ordering.

Consider explicitly passing an Executor to supplyAsync to avoid scheduling your IO-bound sql queries in ForkJoinPool.commonPool().

As @Ruben pointed out, you are joining each task in the current thread immediately after submitting it and before submitting the next query, which is likely a bug. You should submit all queries first and only then start joining.

You can do it like this (with static import of toList):

queries.stream()
    .map(query -> CompletableFuture.supplyAsync(...))
    .collect(toList())
    .stream()
    .map(CompletableFuture::join)
    .collect(toList());
like image 97
Misha Avatar answered Feb 14 '23 02:02

Misha