Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

CompletableFuture to execute multiple DB queries asynchronously

I want to execute multiple DB queries parallelly and store the results in a map. I am trying to do it like this but the map is not getting populated completely when I am accessing the map.

Am I doing anything wrong?

 public Map<MapKeyEnums, Set<String>> doDBCalls(String phoneNumber, long timestamp) {

         Map<MapKeyEnums, Set<String>> instrumentsEdgesMap = new EnumMap<>(MapKeyEnums.class);

         CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "ABC", timestamp)).
                    thenApply(x -> instrumentsEdgesMap.put(MapKeyEnums.ABC, x));

         CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "XYZ", timestamp)).
                    thenApply(x -> instrumentsEdgesMap.put(MapKeyEnums.XYZ, x));

         CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "DEF", timestamp)).
                    thenApply(x -> instrumentsEdgesMap.put(MapKeyEnums.DEF, x));

         return instrumentsEdgesMap;

}

Any help will be much appreciated, thanks in advance.

like image 940
The-Proton-Resurgence Avatar asked Nov 14 '19 20:11

The-Proton-Resurgence


2 Answers

In the above approach supplyAsync will be executed by the Async thread from ForkJoinPool, but thenApply method is always executed by calling thread. So your queries will run one after the another in sequence which it is not Asynchronous

All async methods without an explicit Executor argument are performed using the ForkJoinPool.commonPool() (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task).

Here is the example

CompletableFuture.supplyAsync(()->{
        System.out.println(Thread.currentThread().getName());
        return "SupplyAsync";
    }).thenAccept(i->{
    System.out.println(Thread.currentThread().getName()+"--"+i);
    });

Output :

ForkJoinPool.commonPool-worker-3
main--SupplyAsync

So if you want your process to be Async then first trigger all three db queries with supplyAsync and capture the output within CompletableFuture

CompletableFuture<Set<String>> first =  CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "ABC", timestamp));

CompletableFuture<Set<String>> second =  CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "XYZ", timestamp));

CompletableFuture<Set<String>> third =  CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "DEF", timestamp));

And then now create a stream with three of them and then collect them to Map

Stream.of(new AbstractMap.SimpleEntry<MapKeyEnums, CompletableFuture<Set<String>>>(MapKeyEnums.ABC, first),
              new AbstractMap.SimpleEntry<MapKeyEnums, CompletableFuture<Set<String>>>(MapKeyEnums.XYZ, second),
              new AbstractMap.SimpleEntry<MapKeyEnums, CompletableFuture<Set<String>>>(MapKeyEnums.DEF, third))
       .forEach(entry->{
           entry.getValue().thenAccept(val-> instrumentsEdgesMap.put(entry.getKey(), val));
       });
like image 188
Deadpool Avatar answered Sep 23 '22 16:09

Deadpool


You have to wait for the futures to complete before you return the result.

Try something like

    public Map<MapKeyEnums, Set<String>> doDBCalls(String phoneNumber, long timestamp) {

        Map<MapKeyEnums, Set<String>> instrumentsEdgesMap = new EnumMap<>(MapKeyEnums.class);

        CompletableFuture.allOf(
            CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "ABC", timestamp))
                .thenAccept(x -> instrumentsEdgesMap.put(MapKeyEnums.ABC, x)),

            CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "XYZ", timestamp))
                .thenAccept(x -> instrumentsEdgesMap.put(MapKeyEnums.XYZ, x)),

            CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "DEF", timestamp))
                .thenAccept(x -> instrumentsEdgesMap.put(MapKeyEnums.DEF, x)))
        .get(); // wait for completion of all three subtasks

        return instrumentsEdgesMap;
    }

like image 26
aventurin Avatar answered Sep 23 '22 16:09

aventurin