Idea
I have a processing method which takes in a list of items and processes them asynchronously using external web service. The process steps also persist data while processing. At the end of whole process, I want to persist the whole process along with each processed results as well.
Problem
I convert each item in the list into CompletableFuture
and run a processing task on them, and put them back into an array of futures. Now using its .ofAll
method (in sequence method) to complete future when all the submitted tasks are completed and return another CompletableFuture
which holds the result.
When I want to get that result, I call .whenComplete(..)
, and would want to set the returned result into my entity as data, and then persist to the database, however the repository save call just does nothing and continues threads just continue running, it's not going past the repository save call.
@Transactional
public void process(List<Item> items) {
List<Item> savedItems = itemRepository.save(items);
final Process process = createNewProcess();
final List<CompletableFuture<ProcessData>> futures = savedItems.stream()
.map(item -> CompletableFuture.supplyAsync(() -> doProcess(item, process), executor))
.collect(Collectors.toList());
sequence(futures).whenComplete((data, throwable) -> {
process.setData(data);
processRepository.save(process); // <-- transaction lost?
log.debug("Process DONE"); // <-- never reached
});
}
Sequence method
private static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
CompletableFuture<Void> allDoneFuture =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
return allDoneFuture.thenApply(v ->
futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
);
}
What is happening? Why is the persist call not passing. Is the thread that started the transaction not able to commit the transaction or where does it get lost? All the processed data returns fine and is all good. I've tried different transaction strategies, but how is it possible to control which thread is gonna finish the transaction, if it's the case?
Any advice?
Instead of catching an exception in a syntactic block, the CompletableFuture class allows us to handle it in a special handle method. This method receives two parameters: a result of a computation (if it finished successfully), and the exception thrown (if some computation step did not complete normally).
Future vs CompletableFuture. CompletableFuture is an extension to Java's Future API which was introduced in Java 5. A Future is used as a reference to the result of an asynchronous computation.
The CompletableFuture, was introduced in Java 8, provides an easy way to write asynchronous, non-blocking and multi-threaded code. The Future interface which was introduced in Java 5, to handle asynchronous computations.
It just provides a get() method which blocks until the result is available to the main thread. Ultimately, it restricts users from applying any further action on the result. You can create an asynchronous workflow with CompletableFuture. It allows chaining multiple APIs, sending ones to result to another.
In case of Spring Boot Application , you need following configurations.
The main application method should be annotated with @EnableAsync.
@Async annotation should be on the top of method having @Transactional annotation. This is necessary to indicate processing will be taking place in child thread.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With