Normally with a CompletableFuture I would call thenApply or some other method on it to do something as soon as the result is available. However, I now have a situation where I want to process results until I receive a positive result and then ignore all further results.
If I just wanted to take the first available result I could use CompletableFuture.anyOf (although I hate having to convert a list to an array just to call anyOf). But that's not what I want. I want to take the first result and if it does not have a desirable result then I want to process the second available result and so on until I get a desirable result.
Here's a simple example which goes through all results and returns the first value it finds which is greater than 9. (Note that this is not my real task. This is just a simple example.)
public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
for(CompletableFuture<Integer> result : results) {
Integer v = result.get();
if(v > 9)
return v;
}
return null;
}
Of course, that example goes through the results from the beginning, not by looking at results as they complete. So here is one that accomplishes what I want, but with much more complicated code.
public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
AtomicInteger finalResult = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(results.size());
for(CompletableFuture<Integer> result : results) {
result.whenComplete((v,e) -> {
if(e!=null) {
Logger.getLogger(getClass()).error("",e);
} else if(v > 9) {
finalResult.set(v);
while(latch.getCount() > 0)
latch.countDown();
return;
}
latch.countDown();
});
}
latch.await();
if(finalResult.get() > 9)
return finalResult.get();
return null;
}
Is there an api where I can just do this?
public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
Iterator<Integer> resultIt = getResultsAsAvailable(results);
for(; resultIt.hasNext();) {
Integer v = resultIt.next();
if(v > 9)
return v;
}
return null;
}
Or even better:
public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
return getFirstMatch(results, r -> {return r > 9;});
}
I don't know of any such API in the JDK or elsewhere. You can roll your own.
You can take advantage of the fact that CompletableFuture#complete
(and completeExceptionally
) does nothing if the future has already been completed.
If not already completed, sets the value returned by
get()
and related methods to the given value.
Create a new final result CompletableFuture
. Add a continuation to each of your futures that attempts to complete
this final result if your condition applies. That future will complete with the first success. However, if none succeed, you apparently need null
as a result. You can create a CompletableFuture
with allOf
to also attempt to complete
the final result with null
.
Something like
public static <T> CompletableFuture<T> firstOrNull(List<CompletableFuture<T>> futures, Predicate<T> condition) {
CompletableFuture<T> finalResult = new CompletableFuture<>();
// attempt to complete on success
futures.stream().forEach(future -> future.thenAccept(successResult -> {
if (condition.test(successResult))
finalResult.complete(successResult);
}));
CompletableFuture<?> all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
all.thenRun(() -> {
finalResult.complete(null);
});
return finalResult;
}
You pay the overhead of no-op invocations.
You can change null
to some default value as appropriate or handle exceptions differently (completeExceptionally
as soon as an error occurs). You'll have to use whenComplete
or handle
instead of the thenAccept
above to gain access to the Exception
.
You can use the following solution:
public static <T> CompletableFuture<T> anyMatch(
List<? extends CompletionStage<? extends T>> l, Predicate<? super T> criteria) {
CompletableFuture<T> result=new CompletableFuture<>();
Consumer<T> whenMatching=v -> { if(criteria.test(v)) result.complete(v); };
CompletableFuture.allOf(l.stream()
.map(f -> f.thenAccept(whenMatching)).toArray(CompletableFuture<?>[]::new))
.whenComplete((ignored, t) ->
result.completeExceptionally(t!=null? t: new NoSuchElementException()));
return result;
}
The basic principle is the same as in Pillar’s answer, however, there are some differences:
CompletableFuture.allOf
is combined with the registration of the follow-up action to the source futures. As a side effect, the handler of the allOf
action is dependent on the completion of all attempts to complete the result, rather than the original futures only. This makes the actually desired dependency explicit. That way, it would even work when we replace all thenAccept
s with thenAcceptAsync
s.NoSuchElementException
rather than returning null
in the case that no result meet the criteria. If at least one future completed exceptionally and there is no successful completion with a matching result, one of the occurred exceptions is relayed.You may try it with
List<CompletableFuture<Integer>> list=Arrays.asList(
CompletableFuture.supplyAsync(()->5),
CompletableFuture.supplyAsync(()->{throw new RuntimeException(); }),
CompletableFuture.supplyAsync(()->42),
CompletableFuture.completedFuture(0)
);
anyMatch(list, i -> i>9)
.thenAccept(i->System.out.println("got "+i))
// optionally chain with:
.whenComplete((x,t)->{ if(t!=null) t.printStackTrace(); });
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With