Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

In java, how do I process CompletableFutures and get the first desireable result that completes?

Normally with a CompletableFuture I would call thenApply or some other method on it to do something as soon as the result is available. However, I now have a situation where I want to process results until I receive a positive result and then ignore all further results.

If I just wanted to take the first available result I could use CompletableFuture.anyOf (although I hate having to convert a list to an array just to call anyOf). But that's not what I want. I want to take the first result and if it does not have a desirable result then I want to process the second available result and so on until I get a desirable result.

Here's a simple example which goes through all results and returns the first value it finds which is greater than 9. (Note that this is not my real task. This is just a simple example.)

public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
    for(CompletableFuture<Integer> result : results) {
        Integer v = result.get();
        if(v > 9)
            return v;
    }
    return null;
}

Of course, that example goes through the results from the beginning, not by looking at results as they complete. So here is one that accomplishes what I want, but with much more complicated code.

public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
    AtomicInteger finalResult = new AtomicInteger();
    CountDownLatch latch = new CountDownLatch(results.size());
    for(CompletableFuture<Integer> result : results) {
        result.whenComplete((v,e) -> {
            if(e!=null) {
                Logger.getLogger(getClass()).error("",e);
            } else if(v > 9) {
                finalResult.set(v);
                while(latch.getCount() > 0)
                    latch.countDown();
                return;
            }
            latch.countDown();
        });
    }
    latch.await();

    if(finalResult.get() > 9)
        return finalResult.get();
    return null;
}    

Is there an api where I can just do this?

public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
    Iterator<Integer> resultIt = getResultsAsAvailable(results);
    for(; resultIt.hasNext();) {
        Integer v = resultIt.next();
        if(v > 9)
            return v;
    }
    return null;
}

Or even better:

public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
    return getFirstMatch(results, r -> {return r > 9;});
}
like image 970
HappyEngineer Avatar asked Apr 14 '16 01:04

HappyEngineer


2 Answers

I don't know of any such API in the JDK or elsewhere. You can roll your own.

You can take advantage of the fact that CompletableFuture#complete (and completeExceptionally) does nothing if the future has already been completed.

If not already completed, sets the value returned by get() and related methods to the given value.

Create a new final result CompletableFuture. Add a continuation to each of your futures that attempts to complete this final result if your condition applies. That future will complete with the first success. However, if none succeed, you apparently need null as a result. You can create a CompletableFuture with allOf to also attempt to complete the final result with null.

Something like

public static <T> CompletableFuture<T> firstOrNull(List<CompletableFuture<T>> futures, Predicate<T> condition) {
    CompletableFuture<T> finalResult = new CompletableFuture<>();
    // attempt to complete on success
    futures.stream().forEach(future -> future.thenAccept(successResult -> {
        if (condition.test(successResult))
            finalResult.complete(successResult);
    }));
    CompletableFuture<?> all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
    all.thenRun(() -> {
        finalResult.complete(null);
    });
    return finalResult;
}

You pay the overhead of no-op invocations.

You can change null to some default value as appropriate or handle exceptions differently (completeExceptionally as soon as an error occurs). You'll have to use whenComplete or handle instead of the thenAccept above to gain access to the Exception.

like image 162
Savior Avatar answered Sep 30 '22 18:09

Savior


You can use the following solution:

public static <T> CompletableFuture<T> anyMatch(
    List<? extends CompletionStage<? extends T>> l, Predicate<? super T> criteria) {

    CompletableFuture<T> result=new CompletableFuture<>();
    Consumer<T> whenMatching=v -> { if(criteria.test(v)) result.complete(v); };
    CompletableFuture.allOf(l.stream()
        .map(f -> f.thenAccept(whenMatching)).toArray(CompletableFuture<?>[]::new))
    .whenComplete((ignored, t) ->
        result.completeExceptionally(t!=null? t: new NoSuchElementException()));
    return result;
}

The basic principle is the same as in Pillar’s answer, however, there are some differences:

  • The generic signature is more flexible.
  • The creation of the array necessary for CompletableFuture.allOf is combined with the registration of the follow-up action to the source futures. As a side effect, the handler of the allOf action is dependent on the completion of all attempts to complete the result, rather than the original futures only. This makes the actually desired dependency explicit. That way, it would even work when we replace all thenAccepts with thenAcceptAsyncs.
  • This solution completes with a NoSuchElementException rather than returning null in the case that no result meet the criteria. If at least one future completed exceptionally and there is no successful completion with a matching result, one of the occurred exceptions is relayed.

You may try it with

List<CompletableFuture<Integer>> list=Arrays.asList(
    CompletableFuture.supplyAsync(()->5),
    CompletableFuture.supplyAsync(()->{throw new RuntimeException(); }),
    CompletableFuture.supplyAsync(()->42),
    CompletableFuture.completedFuture(0)
);
anyMatch(list, i -> i>9)
    .thenAccept(i->System.out.println("got "+i))
    // optionally chain with:
    .whenComplete((x,t)->{ if(t!=null) t.printStackTrace(); });
like image 33
Holger Avatar answered Sep 30 '22 16:09

Holger