Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java collecting results of CompletableFuture from multiple calls

I have to run multiple external call operations and then obtain the result in a form of list. I decided to use the CompletableFuture api, and the code I prepared is pretty disgusting:

The example:

public class Main {
    public static void main(String[] args) {
        String prefix = "collection_";

        List<CompletableFuture<User>> usersResult = IntStream.range(1, 10)
                .boxed()
                .map(num -> prefix.concat("" + num))
                .map(name -> CompletableFuture.supplyAsync(
                        () -> callApi(name)))
                .collect(Collectors.toList());

        try {
            CompletableFuture.allOf(usersResult.toArray(new CompletableFuture[usersResult.size()])).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        List<User> users = usersResult //the result I need
                .stream()
                .map(userCompletableFuture -> {
                    try {
                        return userCompletableFuture.get();
                    } catch (InterruptedException | ExecutionException e) {
                        e.printStackTrace();
                    }
                    return null;
                })
                .filter(Objects::nonNull)
                .collect(Collectors.toList());
    }

    private static User callApi(String collection) {
        return new User(); //potentially time-consuming operation
    }
}

I have the following questions:

  1. Can I somehow avoid duplicating the try-catch block in the stream, where I'm mapping CompletableFuture to User?
  2. Can this code be less sequential (how can I avoid waiting for all the futures to finish?)
  3. Is it ok, to do it this way (will all the futures be resolved in the stream?):

    public class Main {
        public static void main(String[] args) {
            String prefix = "collection_";
    
            List<User> usersResult = IntStream.range(1, 10)
                    .boxed()
                    .map(num -> prefix.concat("" + num))
                    .map(name -> CompletableFuture.supplyAsync(
                            () -> callApi(name)))
                    .filter(Objects::nonNull)
                    .map(userCompletableFuture -> {
                        try {
                            return userCompletableFuture.get();
                        } catch (InterruptedException | ExecutionException e) {
                            e.printStackTrace();
                        }
                        return null;
                    })
                    .collect(Collectors.toList());
        }
    
        private static User callApi(String collection) {
            return new User(); //potentially time-consuming operation
        }
    }
    
like image 952
agienka Avatar asked Oct 30 '17 22:10

agienka


People also ask

Can you describe how Java CompletableFuture works?

What is CompletableFuture? A CompltableFuture is used for asynchronous programming. Asynchronous programming means writing non-blocking code. It runs a task on a separate thread than the main application thread and notifies the main thread about its progress, completion or failure.

What does CompletableFuture return?

toCompletableFuture() is an instance method of the CompletableFuture class. It is used to return the same completable future upon which this method is invoked.

How do you use Executorservice with CompletableFuture?

The above code block shows how we can use the Executor in CompletableFuture. We create the Executor Service object at line 7 with thread pool as fixed thread pool with 2 as value. As a next step in line 20, we just simply provide it in the runAsync() method as a parameter of CompletableFuture class.

How does CompletableFuture allOf work?

It returns a new CompletableFuture object when all of the specified CompletableFutures are complete. If any of the specified CompletableFutures are complete with an exception, the resulting CompletableFuture does as well, with a CompletionException as the cause.


1 Answers

For 1., you can entirely skip the allOf().get() calls since you are anyway waiting on all futures one by one.¹

For 2., you can simplify the try-catch by doing the following:

  • use exceptionally() to handle exceptions directly in the future;
  • use join() instead of get() to avoid checked exceptions (and you know no exceptions are possible).

For 3., you cannot really make it less sequential since you need at least to steps: create all futures and then process their results.

If you do everything in a single stream, it will create each future, then immediately wait on it before creating the next one – so you would lose the parallelism. You could use a parallel stream instead but then there wouldn't be much benefit of using CompletableFutures.

So the final code is:

List<CompletableFuture<User>> usersResult = IntStream.range(1, 10)
        .boxed()
        .map(num -> prefix.concat("" + num))
        .map(name -> CompletableFuture.supplyAsync(() -> callApi(name))
            .exceptionally(e -> {
                e.printStackTrace();
                return null;
            }))
        .collect(Collectors.toList());

List<User> users = usersResult
        .stream()
        .map(CompletableFuture::join)
        .filter(Objects::nonNull)
        .collect(Collectors.toList());

¹ Note that an allOf() call remains needed if you want your result to be a CompletableFuture<List<User>> as well, e.g.

final CompletableFuture<List<User>> result =
        CompletableFuture.allOf(usersResult.stream().toArray(CompletableFuture[]::new))
                .thenApply(__ -> usersResult
                        .stream()
                        .map(CompletableFuture::join)
                        .filter(Objects::nonNull)
                        .collect(Collectors.toList()));
like image 138
Didier L Avatar answered Oct 17 '22 15:10

Didier L