Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java CompletableFuture: only first result

After reading this article https://community.oracle.com/docs/DOC-995305 on Oracle site i'm trying to implement the pattern described in "Some Two-to-One Selecting Patterns" paragraph. This last category of patterns also contains two-to-one patterns. But this time, instead of executing the downstream element once, the two upstream elements are completed, and the downstream element is executed when one of the two upstream elements is completed. This might prove very useful when we want to resolve a domain name, for instance. Instead of querying only one domain name server, we might find it more efficient to query a group of domain name servers. We do not expect to have different results from the different servers, so we do not need more answers than the first we get. All the other queries can be safely canceled.

It's simple to implement a scenario where I have only 2 CompleatableFuture but I'm not able to implement the same scenario with 3 or more CompleatableFuture.

I tried this:

    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF1"));
    CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF2"));
    CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF3"));
    CompletableFuture<String> cf4 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF4"));

    cf1.applyToEither(
            cf2, s1 -> cf2.applyToEither(
                    cf3, s2 -> cf3.applyToEither(
                            cf4, s3 -> "First result is: " + s3))).thenAccept(System.out::println).join();

FutureMain is my class and this is generateString method

public static String generateString(String input) {
    Random r = new Random();
    int millis = r.nextInt(6) * 1000;
    System.out.println(input + " " + millis);
    try {
        Thread.sleep(millis);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return input + ": " + millis;
}

I successfully combined multiple CompleatableFuture when I want all of them completed:

    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF1"));
    CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF2"));
    CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF3"));
    CompletableFuture<String> cf4 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF4"));

    CompletableFuture<String> cf5 = CompletableFuture.allOf(cf1, cf2, cf3, cf4).thenApply(
            s -> elaborate(cf1.join(), cf2.join(), cf3.join(), cf4.join())); 

    cf5.thenAccept(System.out::println).join();

Any suggestion?

like image 498
Francesco Iaccarino Avatar asked Mar 10 '23 08:03

Francesco Iaccarino


2 Answers

The “two-to-one” pattern doesn’t scale well to arbitrary numbers. That’s why convenience methods like allOf and anyOf exist. Since you noticed the former, it’s not clear why you ignored the latter:

CompletableFuture<String> cf1
    = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF1"));
CompletableFuture<String> cf2
    = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF2"));
CompletableFuture<String> cf3
    = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF3"));
CompletableFuture<String> cf4
    = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF4"));

CompletableFuture<String> cf5 = CompletableFuture.anyOf(cf1, cf2, cf3, cf4)
    .thenApply(String.class::cast); 

cf5.thenAccept(System.out::println).join();

This does the job of completing once the first completed.

The disadvantage of this method is that it always prefers the first completed, whether it completed exceptionally or not. An alternative, which completes upon the first non-exceptional completion and only completes exceptionally if all futures completed exceptionally, has been shown in this answer:

public static <T>
    CompletableFuture<T> anyOf(List<? extends CompletionStage<? extends T>> l) {

    CompletableFuture<T> f=new CompletableFuture<>();
    Consumer<T> complete=f::complete;
    CompletableFuture.allOf(
        l.stream().map(s -> s.thenAccept(complete)).toArray(CompletableFuture<?>[]::new)
    ).exceptionally(ex -> { f.completeExceptionally(ex); return null; });
    return f;
}

It also makes the type cast obsolete:

CompletableFuture<String> cf1
    = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF1"));
CompletableFuture<String> cf2
    = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF2"));
CompletableFuture<String> cf3
    = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF3"));
CompletableFuture<String> cf4 // to demonstrate that this quick failure is not prefered
    = CompletableFuture.supplyAsync(() -> { throw new RuntimeException(); });

CompletableFuture<String> cf5 = anyOf(Arrays.asList(cf1, cf2, cf3, cf4));

cf5.thenAccept(System.out::println).join();
like image 92
Holger Avatar answered Mar 11 '23 21:03

Holger


Two-to-One Selecting Patterns says:

the downstream element is executed when one of the two upstream elements is completed.

for example, select an user from two servers,a server return an user, and another server will be blocked or for some reason return an user later, regardless which server has been returned an user, the downstream will be executed.

using java8 stream api to implements Two-to-One Selecting Patterns

//the first upstream is always blocked.
CompletableFuture<String> blocked = new CompletableFuture<>();
CompletableFuture<String> upstreams = Stream.of(cf1, cf2, cf3, cf4).reduce(blocked,
        (it, upstream) -> it.applyToEither(upstream, Function.identity()));

upstreams.thenAccept(System.out::println).join();// print "foo"

Your code should be like this:

I imported static method supplyAsync from CompletableFeature for print issues.

CompletableFuture<String> cf1 = supplyAsync(blocked(String.class));
CompletableFuture<String> cf2 = supplyAsync(returnValueLater("bar"));
CompletableFuture<String> cf3 = supplyAsync(blocked(String.class));
CompletableFuture<String> cf4 = supplyAsync(returnValue("foo"));

CompletableFuture<String> upstreams = cf1.applyToEither(cf2, Function.identity())
                                         .applyToEither(cf3, Function.identity())
                                         .applyToEither(cf4, Function.identity());

upstreams.thenAccept(System.out::println).join();// print "foo"

private <T> Supplier<T> returnValue(T value) {
    return returnValue(() -> value);
}

private <T> Supplier<T> blocked(Class<T> type) {
    return returnValue(() -> {
        Thread.currentThread().join();
        return null;
    });
}

private <T> Supplier<T> returnValueLater(T value) {
    return returnValue(() -> {
        Thread.sleep(100);
        return value;
    });
}

private <T> Supplier<T> returnValue(Callable<T> value) {
    return () -> {
        try {
            return value.call();
        } catch (Exception e) { throw new RuntimeException(e); }
    };
}

All of CompletableFuture Patterns

import org.junit.jupiter.api.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.*;
import java.util.stream.*;
import static java.util.Arrays.asList;
import static java.util.concurrent.CompletableFuture.*;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
public class CompletableFuturePatternTest {

    @Test @DisplayName("Two-to-One Selecting Pattern")
    void selectingManyToOne() throws Throwable {
        String user = select("select user from User", String.class)
                .from(availableServers())
                .getFirstResult();

        assertThat(user, equalTo("Joe"));
    }

    @Test @DisplayName("Two-to-One Combining Pattern")
    void combiningManyToOne() throws Throwable {
        List<String> users = select("select user from User", String.class)
                .from(availableServers())
                .list();

        assertThat(users, equalTo(asList("Bob", "Joe", "Doe")));
    }

    @Test @DisplayName("One-to-One Pattern")
    void waitUntilUpstreamCompleted() throws Throwable {
        String user = select("select user from User", String.class)
                .from(availableServers())
                .to(String::toUpperCase);

        assertThat(user, equalTo("JOE"));
    }

    private CompletableFuture<String>[] availableServers() {
        return new CompletableFuture[]{
                server(returnValueLater("Bob")),
                server(returnValue("Joe")),
                server(returnValueLater("Doe")),
        };
    }

    private <T> CompletableFuture<T> server(Supplier<T> supplier) {
        return supplyAsync(supplier);
    }

    private <T> Supplier<T> returnValue(T value) {
        return returnValue(() -> value);
    }


    private <T> Supplier<T> returnValueLater(T value) {
        return returnValue(() -> {
            Thread.sleep(500);
            return value;
        });
    }

    private <T> Supplier<T> returnValue(Callable<T> value) {
        return () -> {
            try {
                return value.call();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        };
    }

    private <T> Query<T> select(String query, Class<T> type) {
        return new Query<T>() {
            private CompletableFuture<T>[] upstreams;

            @Override
            public Query<T> from(CompletableFuture<T>... upstreams) {
                this.upstreams = upstreams;
                return this;
            }

            @Override
            public T getFirstResult() throws Exception {
                return selecting().get();
            }

            @Override
            public <R> R to(Function<T, R> mapper) throws Exception {
                return selecting().thenApply(mapper).get();
            }

            private CompletableFuture<T> selecting() {
                return upstreams(blocked(), this::selecting);
            }

            private CompletableFuture<T> selecting(CompletableFuture<T> primary,
                                                   CompletableFuture<T> upstream) {
                return primary.applyToEitherAsync(upstream, Function.identity());
            }

            private CompletableFuture<T> blocked() {
                return new CompletableFuture<>();
            }

            @Override
            public List<T> list() throws Exception {
                return upstreams(collector(), this::combine, this::combiner).get();
            }

            private CompletableFuture<List<T>> collector() {
                return completedFuture(new ArrayList<>());
            }

            private CompletableFuture<List<T>> combine(CompletableFuture<List<T>> primary,
                                                       CompletableFuture<T> upstream) {
                return primary.thenCombineAsync(upstream, this::concat);
            }

            private List<T> concat(List<T> result, T value) {
                result.add(value);
                return result;
            }

            private CompletableFuture<List<T>> combiner(CompletableFuture<List<T>> primary
                    , CompletableFuture<List<T>> secondary) {

                return primary.thenCombineAsync(secondary, this::concat);
            }

            private <T> List<T> concat(List<T> primary, List<T> secondary) {
                primary.addAll(secondary);
                return primary;
            }

            private CompletableFuture<T> upstreams(CompletableFuture<T> identity,
                                                   BinaryOperator<CompletableFuture<T>> accumulator) {
                return upstreams(identity, accumulator, accumulator);
            }

            private <U> CompletableFuture<U> upstreams(CompletableFuture<U> identity
                    , BiFunction<CompletableFuture<U>, CompletableFuture<T>, CompletableFuture<U>> accumulator
                    , BinaryOperator<CompletableFuture<U>> combiner) {
                return Stream.of(upstreams).reduce(identity, accumulator, combiner);
            }

        };
    }

    interface Query<T> {
        Query<T> from(CompletableFuture<T>... upstreams);

        T getFirstResult() throws Exception;

        <R> R to(Function<T, R> mapper) throws Exception;

        List<T> list() throws Exception;
    }
}
like image 36
holi-java Avatar answered Mar 11 '23 21:03

holi-java