I was wondering what is the best way to implement a fan out type of functionality with Java 8 Completable future. I recently rewrote a function that had a bunch of old Future instances and then calling get in a loop, blocking on each one, to a somewhat cleaner variant using CompletableFuture. However I am seeing about 2x drop in performance so I am assuming something is not quite right in the way Im using the new API. The code looks something like that:
if (!clinet.login()) {
throw new LoginException("There was a login error");
}
CompletableFuture<List<String>> smths = CompletableFuture
.supplyAsync(client::getSmth);
CompletableFuture<List<Data>> smths2 = smths.thenApply(client::getInformation)
.thenApplyAsync((list) -> list.stream().map(obj -> mapper.map(obj, Data.class)).collect(toList()));
List<CompletableFuture<Map<String, AnotherData>>> waitGroup = new ArrayList<>();
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getIvPercentileM12M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getIvPercentileM6M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getIvPercentile2M6M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getIvPercentile2M12M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getIvPercentile2M24M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getHvPercentileM6M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getHvPercentile2M6M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getHvPercentileM12M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getHvPercentile2M12M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getHvPercentile2M24M));
CompletableFuture
.allOf(waitGroup.toArray(new CompletableFuture[waitGroup.size()]));
List<Data> data = smths2.join();
Map<String, Set<AnotherData>> volPercent = waitGroup.stream()
.map(CompletableFuture::join)
.flatMap((e) -> e.entrySet().stream())
.collect(groupingBy(Map.Entry::getKey,
mapping(Map.Entry::getValue,
toSet())));
data.forEach((d) -> {
Set<AnotherData> asdasd = volPercent.get(d.getSymbol());
if (asdasd != null) {
d.add(asdasd);
}
});
return stocks;
client::getInformation is a blocking network call returning a List, all the clientb.* are doing is something like:
return CompletableFuture.supplyAsync(() -> blockingNetworkCall(params, symbols)
.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, value -> new Data(value.getValue(), TimePeriod.M1, TimePeriod.Y1))));
The original code looked something like that:
List<String> symbols = client.block().get();
Future<Map<String, Data>> smth = client.block2(symbols);
Future<Map<String, Double>> ivM6MResultsFuture = clientB.getIvdataM6M(symbols);
Future<Map<String, Double>> ivM12MResultsFuture = clientB.getIvdataM12M(symbols);
Future<Map<String, Double>> iv2M6MResultsFuture = clientB.getIvdata2M6M(symbols);
Future<Map<String, Double>> iv2M12MResultsFuture = clientB.getIvdata2M12M(symbols);
Future<Map<String, Double>> iv2M24MResultsFuture = clientB.getIvdata2M24M(symbols);
Future<Map<String, Double>> hvM6MResultsFuture = clientB.getHvdataM6M(symbols);
Future<Map<String, Double>> hvM12MResultsFuture = clientB.getHvdataM12M(symbols);
Future<Map<String, Double>> hv2M6MResultsFuture = clientB.getHvdata2M6M(symbols);
Future<Map<String, Double>> hv2M12MResultsFuture = clientB.getHvdata2M12M(symbols);
Future<Map<String, Double>> hv2M24MResultsFuture = clientB.getHvdata2M24M(symbols);
Map<String, Data> doughResults = smth.get();
Map<String, Double> ivM6MResults = ivM6MResultsFuture.get();
Map<String, Double> ivM12MResults = ivM12MResultsFuture.get();
Map<String, Double> iv2M6MResults = iv2M6MResultsFuture.get();
Map<String, Double> iv2M12MResults = iv2M12MResultsFuture.get();
Map<String, Double> iv2M24MResults = iv2M24MResultsFuture.get();
Map<String, Double> hvM6MResults = hvM6MResultsFuture.get();
Map<String, Double> hvM12MResults = hvM12MResultsFuture.get();
Map<String, Double> hv2M6MResults = hv2M6MResultsFuture.get();
Map<String, Double> hv2M12MResults = hv2M12MResultsFuture.get();
Map<String, Double> hv2M24MResults = hv2M24MResultsFuture.get();
with a big for loop to map all the futures together and aggregate a result. Hopefully its clear from the code what its doing, but essentially:
Two main problems:
Do you see any problems with my CompletableFuture usage and any room to improve the implementation based on the outlined criteria, currently its about 2x slower than regular blocking .get() old Futures, that is given as reference
I am a little bit annoyed by the way joining is done, by having to call .allOf() with a void result, is there a better way to do that in the API that I am missing.
As a sidenote, I realize I'm doing a bit more work in the Java8 variant with a bunch of streams and mapping happening, but the time difference is from 22sec in the old to 45secs in the new and total items is about 200, so the majority is actually spend in networking and waiting and not the stream operations.
Thanks!
It's difficult to say, as there are some parts of the code missing, but I would avoid the .join()s (as they block), and instead iterate through waitGroup and use combineAsync passing smths2. Something like:
Stream<CompletableFuture<AnotherData>> map =
waitGroup.stream().map(
cf -> cf.thenCombineAsync(
smths2, (m, l) -> {doWhatever(m, l)}
));
Just an idea...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With