After reading this article https://community.oracle.com/docs/DOC-995305 on Oracle site i'm trying to implement the pattern described in "Some Two-to-One Selecting Patterns" paragraph. This last category of patterns also contains two-to-one patterns. But this time, instead of executing the downstream element once, the two upstream elements are completed, and the downstream element is executed when one of the two upstream elements is completed. This might prove very useful when we want to resolve a domain name, for instance. Instead of querying only one domain name server, we might find it more efficient to query a group of domain name servers. We do not expect to have different results from the different servers, so we do not need more answers than the first we get. All the other queries can be safely canceled.
It's simple to implement a scenario where I have only 2 CompleatableFuture but I'm not able to implement the same scenario with 3 or more CompleatableFuture.
I tried this:
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF1"));
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF2"));
CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF3"));
CompletableFuture<String> cf4 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF4"));
cf1.applyToEither(
cf2, s1 -> cf2.applyToEither(
cf3, s2 -> cf3.applyToEither(
cf4, s3 -> "First result is: " + s3))).thenAccept(System.out::println).join();
FutureMain is my class and this is generateString method
public static String generateString(String input) {
Random r = new Random();
int millis = r.nextInt(6) * 1000;
System.out.println(input + " " + millis);
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
return input + ": " + millis;
}
I successfully combined multiple CompleatableFuture when I want all of them completed:
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF1"));
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF2"));
CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF3"));
CompletableFuture<String> cf4 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF4"));
CompletableFuture<String> cf5 = CompletableFuture.allOf(cf1, cf2, cf3, cf4).thenApply(
s -> elaborate(cf1.join(), cf2.join(), cf3.join(), cf4.join()));
cf5.thenAccept(System.out::println).join();
Any suggestion?
The “two-to-one” pattern doesn’t scale well to arbitrary numbers. That’s why convenience methods like allOf
and anyOf
exist. Since you noticed the former, it’s not clear why you ignored the latter:
CompletableFuture<String> cf1
= CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF1"));
CompletableFuture<String> cf2
= CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF2"));
CompletableFuture<String> cf3
= CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF3"));
CompletableFuture<String> cf4
= CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF4"));
CompletableFuture<String> cf5 = CompletableFuture.anyOf(cf1, cf2, cf3, cf4)
.thenApply(String.class::cast);
cf5.thenAccept(System.out::println).join();
This does the job of completing once the first completed.
The disadvantage of this method is that it always prefers the first completed, whether it completed exceptionally or not. An alternative, which completes upon the first non-exceptional completion and only completes exceptionally if all futures completed exceptionally, has been shown in this answer:
public static <T>
CompletableFuture<T> anyOf(List<? extends CompletionStage<? extends T>> l) {
CompletableFuture<T> f=new CompletableFuture<>();
Consumer<T> complete=f::complete;
CompletableFuture.allOf(
l.stream().map(s -> s.thenAccept(complete)).toArray(CompletableFuture<?>[]::new)
).exceptionally(ex -> { f.completeExceptionally(ex); return null; });
return f;
}
It also makes the type cast obsolete:
CompletableFuture<String> cf1
= CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF1"));
CompletableFuture<String> cf2
= CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF2"));
CompletableFuture<String> cf3
= CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF3"));
CompletableFuture<String> cf4 // to demonstrate that this quick failure is not prefered
= CompletableFuture.supplyAsync(() -> { throw new RuntimeException(); });
CompletableFuture<String> cf5 = anyOf(Arrays.asList(cf1, cf2, cf3, cf4));
cf5.thenAccept(System.out::println).join();
Two-to-One Selecting Patterns says:
the downstream element is executed when one of the two upstream elements is completed.
for example, select an user from two servers,a server return an user, and another server will be blocked or for some reason return an user later, regardless which server has been returned an user, the downstream will be executed.
//the first upstream is always blocked.
CompletableFuture<String> blocked = new CompletableFuture<>();
CompletableFuture<String> upstreams = Stream.of(cf1, cf2, cf3, cf4).reduce(blocked,
(it, upstream) -> it.applyToEither(upstream, Function.identity()));
upstreams.thenAccept(System.out::println).join();// print "foo"
I imported static method supplyAsync
from CompletableFeature
for print issues.
CompletableFuture<String> cf1 = supplyAsync(blocked(String.class));
CompletableFuture<String> cf2 = supplyAsync(returnValueLater("bar"));
CompletableFuture<String> cf3 = supplyAsync(blocked(String.class));
CompletableFuture<String> cf4 = supplyAsync(returnValue("foo"));
CompletableFuture<String> upstreams = cf1.applyToEither(cf2, Function.identity())
.applyToEither(cf3, Function.identity())
.applyToEither(cf4, Function.identity());
upstreams.thenAccept(System.out::println).join();// print "foo"
private <T> Supplier<T> returnValue(T value) {
return returnValue(() -> value);
}
private <T> Supplier<T> blocked(Class<T> type) {
return returnValue(() -> {
Thread.currentThread().join();
return null;
});
}
private <T> Supplier<T> returnValueLater(T value) {
return returnValue(() -> {
Thread.sleep(100);
return value;
});
}
private <T> Supplier<T> returnValue(Callable<T> value) {
return () -> {
try {
return value.call();
} catch (Exception e) { throw new RuntimeException(e); }
};
}
import org.junit.jupiter.api.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.*;
import java.util.stream.*;
import static java.util.Arrays.asList;
import static java.util.concurrent.CompletableFuture.*;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
public class CompletableFuturePatternTest {
@Test @DisplayName("Two-to-One Selecting Pattern")
void selectingManyToOne() throws Throwable {
String user = select("select user from User", String.class)
.from(availableServers())
.getFirstResult();
assertThat(user, equalTo("Joe"));
}
@Test @DisplayName("Two-to-One Combining Pattern")
void combiningManyToOne() throws Throwable {
List<String> users = select("select user from User", String.class)
.from(availableServers())
.list();
assertThat(users, equalTo(asList("Bob", "Joe", "Doe")));
}
@Test @DisplayName("One-to-One Pattern")
void waitUntilUpstreamCompleted() throws Throwable {
String user = select("select user from User", String.class)
.from(availableServers())
.to(String::toUpperCase);
assertThat(user, equalTo("JOE"));
}
private CompletableFuture<String>[] availableServers() {
return new CompletableFuture[]{
server(returnValueLater("Bob")),
server(returnValue("Joe")),
server(returnValueLater("Doe")),
};
}
private <T> CompletableFuture<T> server(Supplier<T> supplier) {
return supplyAsync(supplier);
}
private <T> Supplier<T> returnValue(T value) {
return returnValue(() -> value);
}
private <T> Supplier<T> returnValueLater(T value) {
return returnValue(() -> {
Thread.sleep(500);
return value;
});
}
private <T> Supplier<T> returnValue(Callable<T> value) {
return () -> {
try {
return value.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
};
}
private <T> Query<T> select(String query, Class<T> type) {
return new Query<T>() {
private CompletableFuture<T>[] upstreams;
@Override
public Query<T> from(CompletableFuture<T>... upstreams) {
this.upstreams = upstreams;
return this;
}
@Override
public T getFirstResult() throws Exception {
return selecting().get();
}
@Override
public <R> R to(Function<T, R> mapper) throws Exception {
return selecting().thenApply(mapper).get();
}
private CompletableFuture<T> selecting() {
return upstreams(blocked(), this::selecting);
}
private CompletableFuture<T> selecting(CompletableFuture<T> primary,
CompletableFuture<T> upstream) {
return primary.applyToEitherAsync(upstream, Function.identity());
}
private CompletableFuture<T> blocked() {
return new CompletableFuture<>();
}
@Override
public List<T> list() throws Exception {
return upstreams(collector(), this::combine, this::combiner).get();
}
private CompletableFuture<List<T>> collector() {
return completedFuture(new ArrayList<>());
}
private CompletableFuture<List<T>> combine(CompletableFuture<List<T>> primary,
CompletableFuture<T> upstream) {
return primary.thenCombineAsync(upstream, this::concat);
}
private List<T> concat(List<T> result, T value) {
result.add(value);
return result;
}
private CompletableFuture<List<T>> combiner(CompletableFuture<List<T>> primary
, CompletableFuture<List<T>> secondary) {
return primary.thenCombineAsync(secondary, this::concat);
}
private <T> List<T> concat(List<T> primary, List<T> secondary) {
primary.addAll(secondary);
return primary;
}
private CompletableFuture<T> upstreams(CompletableFuture<T> identity,
BinaryOperator<CompletableFuture<T>> accumulator) {
return upstreams(identity, accumulator, accumulator);
}
private <U> CompletableFuture<U> upstreams(CompletableFuture<U> identity
, BiFunction<CompletableFuture<U>, CompletableFuture<T>, CompletableFuture<U>> accumulator
, BinaryOperator<CompletableFuture<U>> combiner) {
return Stream.of(upstreams).reduce(identity, accumulator, combiner);
}
};
}
interface Query<T> {
Query<T> from(CompletableFuture<T>... upstreams);
T getFirstResult() throws Exception;
<R> R to(Function<T, R> mapper) throws Exception;
List<T> list() throws Exception;
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With