Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

CompletableFuture#whenComplete not called if thenApply is used

I have the following code (resulting from my previous question) that schedules a task on a remote server, and then polls for completion using ScheduledExecutorService#scheduleAtFixedRate. Once the task is complete, it downloads the result. I want to return a Future to the caller so they can decide when and how long to block, and give them the option to cancel the task.

My problem is that if the client cancels the Future returned by the download method, whenComplete block doesn't execute. If I remove thenApply it does. It's obvious I'm misunderstanding something about Future composition... What should I change?

public Future<Object> download(Something something) {
    String jobId = schedule(something);
    CompletableFuture<String> job = pollForCompletion(jobId);
    return job.thenApply(this::downloadResult);
}

private CompletableFuture<String> pollForCompletion(String jobId) {
    ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
    CompletableFuture<String> completionFuture = new CompletableFuture<>();

    ScheduledFuture<?> checkFuture = executor.scheduleAtFixedRate(() -> {           
            if (pollRemoteServer(jobId).equals("COMPLETE")) {
                completionFuture.complete(jobId);
            }
    }, 0, 10, TimeUnit.SECONDS);
    completionFuture                
            .whenComplete((result, thrown) -> {
                System.out.println("XXXXXXXXXXX"); //Never happens unless thenApply is removed
                checkFuture.cancel(true);
                executor.shutdown();
            });
    return completionFuture;
}

On the same note, if I do:

return completionFuture.whenComplete(...)

instead of

completionFuture.whenComplete(...);
return completionFuture;

whenComplete also never executes. This seems very counterintuitive to me. Shouldn't logically the Future returned by whenComplete be the one I should hold on to?

EDIT:

I changed my code to explicitly back-propagate the cancellation. It's abhorrent and unreadable, but it works and I couldn't find a better way:

public Future<Object> download(Something something) throws ChartDataGenException, Exception {
        String jobId = schedule(something);
        CompletableFuture<String> job = pollForCompletion(jobId);
        CompletableFuture<Object> resulting = job.thenApply(this::download);
        resulting.whenComplete((result, thrown) -> {
            if (resulting.isCancelled()) { //the check is not necessary, but communicates the intent better
                job.cancel(true);
            }
        });
        return resulting;
}

EDIT 2:

I've discovered tascalate-concurrent, a wonderful library providing a sane implementation of CompletionStage, with support for dependent promises (via the DependentPromise class) that can transparently back-propagate cancellations. Seems perfect for this use-case.

This should be enough:

DependentPromise
          .from(pollForCompletion(jobId))
          .thenApply(this::download, true); //true means the cancellation should back-propagate

Didn't test this approach, mind you.

like image 738
kaqqao Avatar asked Oct 26 '16 09:10

kaqqao


People also ask

What is CompletableFuture used for?

What is CompletableFuture? A CompltableFuture is used for asynchronous programming. Asynchronous programming means writing non-blocking code. It runs a task on a separate thread than the main application thread and notifies the main thread about its progress, completion or failure.

What is the difference between Future and CompletableFuture?

As a result: Future transferes single value using synchronous interface. CompletableFuture transferes single value using both synchronous and asynchronous interfaces. Rx transferes multiple values using asynchronous interface with backpressure.

Is CompletableFuture thread safe?

CompletableFuture is inherently thread-safe The results of a write by one thread are guaranteed to be visible to a read by another thread only if the write operation happens-before the read operation.

Why is CompletableFuture used in Java?

Instead of catching an exception in a syntactic block, the CompletableFuture class allows us to handle it in a special handle method. This method receives two parameters: a result of a computation (if it finished successfully), and the exception thrown (if some computation step did not complete normally).


1 Answers

Your structure is as follows:

           ┌──────────────────┐
           │ completionFuture |
           └──────────────────┘
             ↓              ↓
  ┌──────────────┐      ┌───────────┐
  │ whenComplete |      │ thenApply |
  └──────────────┘      └───────────┘

So when you cancel the thenApply future, the original completionFuture object remains unaffected as it doesn’t depend on the thenApply stage. If, however, you don’t chain the thenApply stage, you’re returning the original completionFuture instance and canceling this stage causes the cancellation of all dependent stages, causing the whenComplete action to be executed immediately.

But when the thenApply stage is cancelled, the completionFuture still may get completed when the pollRemoteServer(jobId).equals("COMPLETE") condition is fulfilled, as that polling doesn’t stop. But we don’t know the relationship of jobId = schedule(something) and pollRemoteServer(jobId). If your application state changes in a way that this condition can never be fulfilled after canceling a download, this future will never complete…


Regarding your last question, which future is “the one I should hold on to?”, there is no requirement to have a linear chain of futures, in fact, while the convenience methods of CompletableFuture make it easy to create such a chain, more than often, it’s the least useful thing to do, as you could just write a block of code, if you have a linear dependency. Your model of chaining two independent stages is right, but cancellation doesn’t work through it, but it wouldn’t work through a linear chain either.

If you want to be able to cancel the source stage, you need a reference to it, but if you want to be able to get the result of a dependent stage, you’ll need a reference to that stage too.

like image 173
Holger Avatar answered Oct 31 '22 07:10

Holger