I am trying to implement a database query with retry strategy on certain database exceptions. The code for retry strategy is not very relevant, so I did not include it. As you can see in the code below - I have written a retryCallable which takes the retry strategy and the Callable in populateData()
.
In getDataFromDB
, I get the data from DB and put the data in a global hashmap which serves as a cache at an application level.
This code is working as expected. I would like to invoke populateData
from a different class. However, this would be a blocking call. Since this is Database and has retry strategy, this could be slow. I want to call populateData
asynchronously.
How can I use CompletableFuture or FutureTask to achieve this?
CompletableFuture.runAsync
expects a runnable. CompletableFuture.supplyAsync
expects a supplier. I have not implemented these things before. So any advice on best practices would be helpful.
Class TestCallableRetry {
public void populateData() {
final Callable<Set<String>> retryCallable = new RetryingCallable<>(retryStrategyToRetryOnDBException(), getDataFromDB());
Set<String> data = new HashSet<>();
data = retryCallable.call();
if (data != null && !data.isEmpty()) {
// store data in a global hash map
}
}
private Callable<Set<Building>> getDataFromDB() {
return new Callable<Set<String>>() {
@Override
public Set<String> call() {
// returns data from database
}
};
}
}
Class InvokeCallableAsynchronously {
public void putDataInGlobalMap {
// call populateData asynchronously
}
}
CompletableFuture is at the same time a building block and a framework, with about 50 different methods for composing, combining, and executing asynchronous computation steps and handling errors. Such a large API can be overwhelming, but these mostly fall in several clear and distinct use cases.
runAsync. Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor after it runs the given action.
The difference between runAsync() and supplyAsync() is that the former returns a Void while supplyAsync() returns a value obtained by the Supplier. Both methods also support a second input argument — a custom Executor to submit tasks to.
It just provides a get() method which blocks until the result is available to the main thread. Ultimately, it restricts users from applying any further action on the result. You can create an asynchronous workflow with CompletableFuture. It allows chaining multiple APIs, sending ones to result to another.
If you split your populateData
method into two parts, one Supplier
to fetch the data and another Consumer
to store it, it will be easy to chain them with a CompletableFuture
.
// Signature compatible with Supplier<Set<String>>
private Set<String> fetchDataWithRetry() {
final RetryingCallable<Set<String>> retryCallable = new RetryingCallable<>(retryStrategyToRetryOnDBException(), getDataFromDB());
try {
return retryCallable.call();
} catch (Exception e) {
log.error("Call to database failed", e);
return Collections.emptySet();
}
}
// Signature compatible with Consumer<Set<String>>
private void storeData(Set<String> data) {
if (!data.isEmpty()) {
// store data in a global hash map
}
}
Then, in populateData()
:
private ExecutorService executor = Executors.newCachedThreadPool();
public void populateData() {
CompletableFuture
.supplyAsync(this::fetchDataWithRetry, executor)
.thenAccept(this::storeData);
}
The use of the version of supplyAsync
that takes an Executor
is optional. If you use the single arg version your task will run in the common pool; OK for short running tasks but not for tasks that block.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With