I want to execute multiple DB queries parallelly and store the results in a map. I am trying to do it like this but the map is not getting populated completely when I am accessing the map.
Am I doing anything wrong?
public Map<MapKeyEnums, Set<String>> doDBCalls(String phoneNumber, long timestamp) {
Map<MapKeyEnums, Set<String>> instrumentsEdgesMap = new EnumMap<>(MapKeyEnums.class);
CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "ABC", timestamp)).
thenApply(x -> instrumentsEdgesMap.put(MapKeyEnums.ABC, x));
CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "XYZ", timestamp)).
thenApply(x -> instrumentsEdgesMap.put(MapKeyEnums.XYZ, x));
CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "DEF", timestamp)).
thenApply(x -> instrumentsEdgesMap.put(MapKeyEnums.DEF, x));
return instrumentsEdgesMap;
}
Any help will be much appreciated, thanks in advance.
In the above approach supplyAsync
will be executed by the Async
thread from ForkJoinPool, but thenApply
method is always executed by calling thread. So your queries will run one after the another in sequence which it is not Asynchronous
All async methods without an explicit Executor argument are performed using the ForkJoinPool.commonPool() (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task).
Here is the example
CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName());
return "SupplyAsync";
}).thenAccept(i->{
System.out.println(Thread.currentThread().getName()+"--"+i);
});
Output :
ForkJoinPool.commonPool-worker-3
main--SupplyAsync
So if you want your process to be Async
then first trigger all three db queries with supplyAsync
and capture the output within CompletableFuture
CompletableFuture<Set<String>> first = CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "ABC", timestamp));
CompletableFuture<Set<String>> second = CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "XYZ", timestamp));
CompletableFuture<Set<String>> third = CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "DEF", timestamp));
And then now create a stream with three of them and then collect them to Map
Stream.of(new AbstractMap.SimpleEntry<MapKeyEnums, CompletableFuture<Set<String>>>(MapKeyEnums.ABC, first),
new AbstractMap.SimpleEntry<MapKeyEnums, CompletableFuture<Set<String>>>(MapKeyEnums.XYZ, second),
new AbstractMap.SimpleEntry<MapKeyEnums, CompletableFuture<Set<String>>>(MapKeyEnums.DEF, third))
.forEach(entry->{
entry.getValue().thenAccept(val-> instrumentsEdgesMap.put(entry.getKey(), val));
});
You have to wait for the futures to complete before you return the result.
Try something like
public Map<MapKeyEnums, Set<String>> doDBCalls(String phoneNumber, long timestamp) {
Map<MapKeyEnums, Set<String>> instrumentsEdgesMap = new EnumMap<>(MapKeyEnums.class);
CompletableFuture.allOf(
CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "ABC", timestamp))
.thenAccept(x -> instrumentsEdgesMap.put(MapKeyEnums.ABC, x)),
CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "XYZ", timestamp))
.thenAccept(x -> instrumentsEdgesMap.put(MapKeyEnums.XYZ, x)),
CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "DEF", timestamp))
.thenAccept(x -> instrumentsEdgesMap.put(MapKeyEnums.DEF, x)))
.get(); // wait for completion of all three subtasks
return instrumentsEdgesMap;
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With