I am trying to call a rest api for PUT
request in a loop. Each call is a CompletableFuture
. Each api call returns an object of type RoomTypes.RoomType
I want to collect the responses (both successful and error
responses) in different lists. How do I achieve that? I am sure I
cannot use allOf
because it would not get all the results if any
one call fails to update.
How do I log errors/exception for each call?
public void sendRequestsAsync(Map<Integer, List> map1) {
List<CompletableFuture<Void>> completableFutures = new ArrayList<>(); //List to hold all the completable futures
List<RoomTypes.RoomType> responses = new ArrayList<>(); //List for responses
ExecutorService yourOwnExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
for (Map.Entry<Integer, List> entry :map1.entrySet()) {
CompletableFuture requestCompletableFuture = CompletableFuture
.supplyAsync(
() ->
//API call which returns object of type RoomTypes.RoomType
updateService.updateRoom(51,33,759,entry.getKey(),
new RoomTypes.RoomType(entry.getKey(),map2.get(entry.getKey()),
entry.getValue())),
yourOwnExecutor
)//Supply the task you wanna run, in your case http request
.thenApply(responses::add);
completableFutures.add(requestCompletableFuture);
}
completedFuture(U value) Returns a new CompletableFuture that is already completed with the given value. static <U> CompletionStage<U> completedStage(U value) Returns a new CompletionStage that is already completed with the given value and supports only those methods in interface CompletionStage .
The CompletableFuture. get() method is blocking. It waits until the Future is completed and returns the result after its completion.
Overview. allOf() is a static method of the CompletableFuture class. It returns a new CompletableFuture object when all of the specified CompletableFutures are complete.
Exception Handling of CompletableFutureThe call to get() throws an ExecutionException which causes the root Exception.
You can simply use allOf()
to get a future that is completed when all your initial futures are completed (exceptionally or not), and then split them between succeeded and failed using Collectors.partitioningBy()
:
List<CompletableFuture<RoomTypes.RoomType>> completableFutures = new ArrayList<>(); //List to hold all the completable futures
ExecutorService yourOwnExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
for (Map.Entry<Integer, List> entry : map1.entrySet()) {
CompletableFuture<RoomTypes.RoomType> requestCompletableFuture = CompletableFuture
.supplyAsync(
() ->
//API call which returns object of type RoomTypes.RoomType
updateService.updateRoom(51, 33, 759, entry.getKey(),
new RoomTypes.RoomType(entry.getKey(), map2.get(entry.getKey()),
entry.getValue())),
yourOwnExecutor
);
completableFutures.add(requestCompletableFuture);
}
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]))
// avoid throwing an exception in the join() call
.exceptionally(ex -> null)
.join();
Map<Boolean, List<CompletableFuture<RoomTypes.RoomType>>> result =
completableFutures.stream()
.collect(Collectors.partitioningBy(CompletableFuture::isCompletedExceptionally)));
The resulting map will contain one entry with true
for the failed futures, and another entry with false
key for the succeeded ones. You can then inspect the 2 entries to act accordingly.
Note that there are 2 slight changes compared to your original code:
requestCompletableFuture
is now a CompletableFuture<RoomTypes.RoomType>
thenApply(responses::add)
and the responses
list were removedConcerning logging/exception handling, just add the relevant requestCompletableFuture.handle()
to log them individually, but keep the requestCompletableFuture
and not the one resulting from handle()
.
Alternatively, perhaps you can approach the problem from a different perspective and instead of forcing the use of CompletableFuture
, you can use a CompletionService instead.
The whole idea of the CompletionService
is that as soon as an answer for a given future is ready, it gets placed in a queue from which you can consume results.
Alternative 1: Without CompletableFuture
CompletionService<String> cs = new ExecutorCompletionService<>(executor);
List<Future<String>> futures = new ArrayList<>();
futures.add(cs.submit(() -> "One"));
futures.add(cs.submit(() -> "Two"));
futures.add(cs.submit(() -> "Three"));
futures.add(cs.submit(() -> { throw new RuntimeException("Sucks to be four"); }));
futures.add(cs.submit(() -> "Five"));
List<String> successes = new ArrayList<>();
List<String> failures = new ArrayList<>();
while (futures.size() > 0) {
Future<String> f = cs.poll();
if (f != null) {
futures.remove(f);
try {
//at this point the future is guaranteed to be solved
//so there won't be any blocking here
String value = f.get();
successes.add(value);
} catch (Exception e) {
failures.add(e.getMessage());
}
}
}
System.out.println(successes);
System.out.println(failures);
Which yields:
[One, Two, Three, Five]
[java.lang.RuntimeException: Sucks to be four]
Alternative 2: With CompletableFuture
However, if you really, really need to deal with CompletableFuture
you can submit those to the completion service as well, just by placing them directly into its queue:
For example, the following variation has the same result:
BlockingQueue<Future<String>> tasks = new ArrayBlockingQueue<>(5);
CompletionService<String> cs = new ExecutorCompletionService<>(executor, tasks);
List<Future<String>> futures = new ArrayList<>();
futures.add(CompletableFuture.supplyAsync(() -> "One"));
futures.add(CompletableFuture.supplyAsync(() -> "Two"));
futures.add(CompletableFuture.supplyAsync(() -> "Three"));
futures.add(CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Sucks to be four"); }));
futures.add(cs.submit(() -> "Five"));
//places all futures in completion service queue
tasks.addAll(futures);
List<String> successes = new ArrayList<>();
List<String> failures = new ArrayList<>();
while (futures.size() > 0) {
Future<String> f = cs.poll();
if (f != null) {
futures.remove(f);
try {
//at this point the future is guaranteed to be solved
//so there won't be any blocking here
String value = f.get();
successes.add(value);
} catch (Exception e) {
failures.add(e.getMessage());
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With