Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

CompletableFuture : Invoke a void function asynchronusly

I am trying to implement a database query with retry strategy on certain database exceptions. The code for retry strategy is not very relevant, so I did not include it. As you can see in the code below - I have written a retryCallable which takes the retry strategy and the Callable in populateData().

In getDataFromDB, I get the data from DB and put the data in a global hashmap which serves as a cache at an application level.

This code is working as expected. I would like to invoke populateData from a different class. However, this would be a blocking call. Since this is Database and has retry strategy, this could be slow. I want to call populateData asynchronously.

How can I use CompletableFuture or FutureTask to achieve this? CompletableFuture.runAsync expects a runnable. CompletableFuture.supplyAsync expects a supplier. I have not implemented these things before. So any advice on best practices would be helpful.

Class TestCallableRetry {

public void populateData() {
        final Callable<Set<String>> retryCallable = new RetryingCallable<>(retryStrategyToRetryOnDBException(), getDataFromDB());
        Set<String> data = new HashSet<>();

        data = retryCallable.call();

        if (data != null && !data.isEmpty()) {
            // store data in a global hash map
        }
    }

    private Callable<Set<Building>> getDataFromDB() {
        return new Callable<Set<String>>() {
            @Override
            public Set<String> call() {
                // returns data from database
            }
        };
    }
}

Class InvokeCallableAsynchronously {
    public void putDataInGlobalMap {
      // call populateData asynchronously
    }
}
like image 855
maddie Avatar asked Mar 13 '18 02:03

maddie


People also ask

Is CompletableFuture asynchronous?

CompletableFuture is at the same time a building block and a framework, with about 50 different methods for composing, combining, and executing asynchronous computation steps and handling errors. Such a large API can be overwhelming, but these mostly fall in several clear and distinct use cases.

What does CompletableFuture run async do?

runAsync. Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor after it runs the given action.

What is difference between supplyAsync and runAsync?

The difference between runAsync() and supplyAsync() is that the former returns a Void while supplyAsync() returns a value obtained by the Supplier. Both methods also support a second input argument — a custom Executor to submit tasks to.

Is CompletableFuture get blocking?

It just provides a get() method which blocks until the result is available to the main thread. Ultimately, it restricts users from applying any further action on the result. You can create an asynchronous workflow with CompletableFuture. It allows chaining multiple APIs, sending ones to result to another.


1 Answers

If you split your populateData method into two parts, one Supplier to fetch the data and another Consumer to store it, it will be easy to chain them with a CompletableFuture.

// Signature compatible with Supplier<Set<String>> 
private Set<String> fetchDataWithRetry() {
    final RetryingCallable<Set<String>> retryCallable = new RetryingCallable<>(retryStrategyToRetryOnDBException(), getDataFromDB());
    try {
        return retryCallable.call();
    } catch (Exception e) {
        log.error("Call to database failed", e);
        return Collections.emptySet();
    }
}

// Signature compatible with Consumer<Set<String>>
private void storeData(Set<String> data) {
    if (!data.isEmpty()) {
        // store data in a global hash map
    }
}

Then, in populateData():

private ExecutorService executor = Executors.newCachedThreadPool();

public void populateData() {
    CompletableFuture
        .supplyAsync(this::fetchDataWithRetry, executor)
        .thenAccept(this::storeData);
}

The use of the version of supplyAsync that takes an Executor is optional. If you use the single arg version your task will run in the common pool; OK for short running tasks but not for tasks that block.

like image 82
teppic Avatar answered Sep 30 '22 09:09

teppic