I have three functions updateFieldFromCollection1()
, insertFromColletion1ToCollection2()
and deleteFromCollection1()
now.
These calls aren't as such dependent on each other but when I want to chain them in completableFuture they have to run one after the other in the given order.
update-> insert -> delete
The calls don't return anything so i am using runAsync and thenRun methods of completableFuture. And have chained them accordingly. I am iterating on msgIds which is a list of strings.
msgIds.stream().forEach(msgId -> CompletableFuture.runAsync(() ->
{updateFieldFromCollection1()}).thenRun(() ->
{insertFromColletion1ToCollection2()}).thenRun(() ->
{deleteFromCollection1()}));
The above code works (update-> insert -> delete are done) but throws exception like duplicateKey and bulkwrite. I am sure that the problem comes because another thread is starting before the previous thread completes its task. I want them to run asynchronously but I want to discipline the threads by avoiding conflicts.
I am not sure where I need to tweak the code.
The most generic way to process the result of a computation is to feed it to a function. The thenApply method does exactly that; it accepts a Function instance, uses it to process the result, and returns a Future that holds a value returned by a function: CompletableFuture<String> completableFuture = CompletableFuture.
CompletableFuture executes these tasks in a thread obtained from the global ForkJoinPool. commonPool(). But we can also create a Thread Pool and pass it to runAsync() and supplyAsync() methods to let them execute their tasks in a thread obtained from our thread pool.
The CompletableFuture. get() method is blocking. It waits until the Future is completed and returns the result after its completion.
As a result: Future transferes single value using synchronous interface. CompletableFuture transferes single value using both synchronous and asynchronous interfaces. Rx transferes multiple values using asynchronous interface with backpressure.
msgIds.stream().forEach(msgId -> {
updateFieldFromCollection1();
CompletableFuture.runAsync(() ->
{insertFromColletion1ToCollection2()}).thenRun(() ->
{deleteFromCollection1()}));
The update had to be made outside completableFuture. Then it works like a charm :)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With