Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

CompletableFuture in loop: How to collect all responses and handle errors

I am trying to call a rest api for PUT request in a loop. Each call is a CompletableFuture. Each api call returns an object of type RoomTypes.RoomType

  • I want to collect the responses (both successful and error responses) in different lists. How do I achieve that? I am sure I cannot use allOf because it would not get all the results if any one call fails to update.

  • How do I log errors/exception for each call?


public void sendRequestsAsync(Map<Integer, List> map1) {
    List<CompletableFuture<Void>> completableFutures = new ArrayList<>(); //List to hold all the completable futures
    List<RoomTypes.RoomType> responses = new ArrayList<>(); //List for responses
    ExecutorService yourOwnExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    for (Map.Entry<Integer, List> entry :map1.entrySet()) { 
        CompletableFuture requestCompletableFuture = CompletableFuture
                .supplyAsync(
                        () -> 
            //API call which returns object of type RoomTypes.RoomType
            updateService.updateRoom(51,33,759,entry.getKey(),
                           new RoomTypes.RoomType(entry.getKey(),map2.get(entry.getKey()),
                                    entry.getValue())),
                    yourOwnExecutor
            )//Supply the task you wanna run, in your case http request
            .thenApply(responses::add);

    completableFutures.add(requestCompletableFuture);
}
like image 577
Rudrani Angira Avatar asked Jul 02 '18 15:07

Rudrani Angira


People also ask

What is completedStage () method in CompletableFuture interface?

completedFuture​(U value) Returns a new CompletableFuture that is already completed with the given value. static <U> CompletionStage<U> completedStage​(U value) Returns a new CompletionStage that is already completed with the given value and supports only those methods in interface CompletionStage .

Is CompletableFuture get blocking?

The CompletableFuture. get() method is blocking. It waits until the Future is completed and returns the result after its completion.

What is CompletableFuture allOf?

Overview. allOf() is a static method of the CompletableFuture class. It returns a new CompletableFuture object when all of the specified CompletableFutures are complete.

Does CompletableFuture throw exception?

Exception Handling of CompletableFutureThe call to get() throws an ExecutionException which causes the root Exception.


2 Answers

You can simply use allOf() to get a future that is completed when all your initial futures are completed (exceptionally or not), and then split them between succeeded and failed using Collectors.partitioningBy():

List<CompletableFuture<RoomTypes.RoomType>> completableFutures = new ArrayList<>(); //List to hold all the completable futures
ExecutorService yourOwnExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

for (Map.Entry<Integer, List> entry : map1.entrySet()) {
    CompletableFuture<RoomTypes.RoomType> requestCompletableFuture = CompletableFuture
            .supplyAsync(
                    () ->
                //API call which returns object of type RoomTypes.RoomType
                updateService.updateRoom(51, 33, 759, entry.getKey(),
                        new RoomTypes.RoomType(entry.getKey(), map2.get(entry.getKey()),
                                entry.getValue())),
                    yourOwnExecutor
            );

    completableFutures.add(requestCompletableFuture);
}

CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]))
        // avoid throwing an exception in the join() call
        .exceptionally(ex -> null)
        .join();
Map<Boolean, List<CompletableFuture<RoomTypes.RoomType>>> result =
        completableFutures.stream()
                .collect(Collectors.partitioningBy(CompletableFuture::isCompletedExceptionally)));

The resulting map will contain one entry with true for the failed futures, and another entry with false key for the succeeded ones. You can then inspect the 2 entries to act accordingly.

Note that there are 2 slight changes compared to your original code:

  • requestCompletableFuture is now a CompletableFuture<RoomTypes.RoomType>
  • thenApply(responses::add) and the responses list were removed

Concerning logging/exception handling, just add the relevant requestCompletableFuture.handle() to log them individually, but keep the requestCompletableFuture and not the one resulting from handle().

like image 189
Didier L Avatar answered Sep 20 '22 09:09

Didier L


Alternatively, perhaps you can approach the problem from a different perspective and instead of forcing the use of CompletableFuture, you can use a CompletionService instead.

The whole idea of the CompletionService is that as soon as an answer for a given future is ready, it gets placed in a queue from which you can consume results.

Alternative 1: Without CompletableFuture

CompletionService<String> cs = new ExecutorCompletionService<>(executor);

List<Future<String>> futures = new ArrayList<>();

futures.add(cs.submit(() -> "One"));
futures.add(cs.submit(() -> "Two"));
futures.add(cs.submit(() -> "Three"));
futures.add(cs.submit(() -> { throw new RuntimeException("Sucks to be four"); }));
futures.add(cs.submit(() -> "Five"));


List<String> successes = new ArrayList<>();
List<String> failures = new ArrayList<>();

while (futures.size() > 0) {
    Future<String> f = cs.poll();
    if (f != null) {
        futures.remove(f);
        try {
            //at this point the future is guaranteed to be solved
            //so there won't be any blocking here
            String value = f.get();
            successes.add(value);
        } catch (Exception e) {
            failures.add(e.getMessage());
        }
    }
}

System.out.println(successes); 
System.out.println(failures);

Which yields:

[One, Two, Three, Five]
[java.lang.RuntimeException: Sucks to be four]

Alternative 2: With CompletableFuture

However, if you really, really need to deal with CompletableFuture you can submit those to the completion service as well, just by placing them directly into its queue:

For example, the following variation has the same result:

BlockingQueue<Future<String>> tasks = new ArrayBlockingQueue<>(5);
CompletionService<String> cs = new ExecutorCompletionService<>(executor, tasks);

List<Future<String>> futures = new ArrayList<>();

futures.add(CompletableFuture.supplyAsync(() -> "One"));
futures.add(CompletableFuture.supplyAsync(() -> "Two"));
futures.add(CompletableFuture.supplyAsync(() -> "Three"));
futures.add(CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Sucks to be four"); }));
futures.add(cs.submit(() -> "Five"));

//places all futures in completion service queue
tasks.addAll(futures);

List<String> successes = new ArrayList<>();
List<String> failures = new ArrayList<>();

while (futures.size() > 0) {
    Future<String> f = cs.poll();
    if (f != null) {
        futures.remove(f);
        try {
            //at this point the future is guaranteed to be solved
            //so there won't be any blocking here
            String value = f.get();
            successes.add(value);
        } catch (Exception e) {
            failures.add(e.getMessage());
        }
    }
}
like image 35
Edwin Dalorzo Avatar answered Sep 19 '22 09:09

Edwin Dalorzo