Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does my CompletableFuture code run in Java 8 but not in Java 11?

Why does this block of code behave differently in Java 8 vs Java 11?

private static String test2() {
    CompletableFuture
            .runAsync(() -> IntStream.rangeClosed(1, 20).forEach(x -> {
                try {
                    Thread.sleep(500);
                    System.out.println(x);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }));

    return "Finish";
}

I expected it to print Finish and then for numbers from 1 to 20 with a 500 ms interval print that number, and then stop execution, and it works correctly in Java 8.

When I run exactly the same method on Java 11, however, it printed Finish and terminated without invoking the runAsync(...) code. I managed to start it by adding an ExecutorService like this

private static String test2() {

    final ExecutorService executorService = Executors.newFixedThreadPool(10);
    CompletableFuture
            .runAsync(() -> IntStream.rangeClosed(1, 10).forEach(x -> {
                try {
                    Thread.sleep(500);
                    System.out.println(x);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }), executorService);

    return "Finish";
}

And now it gets executed, but doesn't finish; it gets to 10 and after that just sits without finishing. I figured out how to stop execution by invoking executorService.shutdown(); right before return, but I am 100% sure that this approach is wrong, because usually I will have the same executorService for many methods, and if I shut it down other methods will also fail to execute.

What changed between Java 8 and Java 11 here, and why do I now have to add an explicit executor service, and most importantly how can I finish the method execution properly?

like image 425
Misa D. Avatar asked Apr 19 '21 14:04

Misa D.


People also ask

Can Java 8 programs run on Java 11?

The class files created by Java 8 are still executable in Java 11; however, there have been other changes in the Java runtime (library changes, etc.) that might require modification of the code. These modifications may be made in Java 8 and compiled with Java 8 making it compatible with the Java 11 runtime.

Is CompletableFuture blocked?

CompletableFuture allows us to write non-blocking code by running a task on a separate thread than the main application thread and notifying the main thread about its Progress, Completion or Failure. CompletableFuture is inspired from ListenableFuture in Guava and Are similar to Promise in java scripts.

Why do we use CompletableFuture in Java?

CompletableFuture implements Future and CompletionStage interfaces and provides a huge set of convenience methods for creating, chaining and combining multiple Futures. It also has a very comprehensive exception handling support.


Video Answer


3 Answers

TL;DR - add ForkJoinPool.commonPool().awaitQuiescence(1000, TimeUnit.SECONDS); after your call to CompletableFuture.runAsync and at the end of your code so that System.exit doesn't stop your runnable. That way you'll get you behavior.


Longer answer:

Okay, so first things first, I tried both examples in Oracles java 8, OpenJDK 8, and OpenJDK 11. Consistent behavior across the board, so my answer is that nothing has changed in these implementations of different java versions that would cause this discrepancy. In both examples, the behavior you see is consistent with what Java tells you it will do.

From the documentation of CompletableFuture.runAsync

Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() after it runs the given action.

Okay... let's see what ForkJoinPool.commonPool will tell us (emphasis mine):

Returns the common pool instance. This pool is statically constructed; its run state is unaffected by attempts to shutdown() or shutdownNow(). However this pool and any ongoing processing are automatically terminated upon program System.exit(int). Any program that relies on asynchronous task processing to complete before program termination should invoke commonPool().awaitQuiescence, before exit.

Aha, so that's why we don't see the countdown when using the common pool, it's because the common pool will be terminated upon system exit, which is exactly what happens when we return from the method and exit the program (assuming your example is really that simple as you show it.... like with a single method call in main... anyways)

So why would the custom executor work? Because, as you've already noticed, that executor has not been terminated. There is still a piece of code running in the background, although idly, that Java doesn't have the power to stop.


So what can we do now?

One option is to do our own executor and shut it down once we are done, much like you have suggested. I would argue that this approach isn't all that bad after all to use.

Second option is to follow what the java doc says.

Any program that relies on asynchronous task processing to complete before program termination should invoke commonPool().awaitQuiescence, before exit.

public boolean awaitQuiescence​(long timeout, TimeUnit unit)

If called by a ForkJoinTask operating in this pool, equivalent in effect to ForkJoinTask.helpQuiesce(). Otherwise, waits and/or attempts to assist performing tasks until this pool isQuiescent() or the indicated timeout elapses.

So we can call that method and specify a timeout for all common processes in the common pool. My opinion is that this is somewhat business specific, since now you have to answer the question - What the heck should the timeout be now??.

Third option is to use the power of CompletableFutures and hoist this runAsync method to a variable:

CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> ...
...
...
bla bla bla code bla bla bla
...
...
voidCompletableFuture.join();
// or if you want to handle exceptions, use get
voidCompletableFuture.get();

and then right when you need it, you join()/get() whatever you need to have as a return value. I prefer this the most since the code is most clean and understandable like this. Also I can chain my CFs all I want and do funky stuff with them.


In the case where you don't need a return value and don't need to do anything else and just want a simple return of a string and async processing of counting from 1 to 20, then just shove ForkJoinPool.commonPool().awaitQuiescence(1000, TimeUnit.SECONDS); somewhere convenient to you and give it some ridiculous timeout, thus guaranteeing you'll exit upon all idle processes.

like image 185
mnestorov Avatar answered Oct 21 '22 23:10

mnestorov


I would say your program just behaves as expected. runAsync will run the provided action in the common fork join pool, unless you provide an executor. In any case, if you don't wait for your completable future to complete, the method test2 immediately prints "Finish" and returns.

"Finish" could just been printed an any time: you may see "Finish, 1, ..." or "1, Finish, 2..." etc... It's a race condition.

When you don't use an executor, because threads in the common pool are daemon threads, your program may exit at any time, and won't wait for the scheduled action to complete.

When you use an executor with non daemon threads (which is usually the default), the program won't exit until the executor is shutdown.

The only way to make sure that your program doesn't exit before your action is finished is to wait for the completable future to complete, calling either get or join as was suggested in the other answer.

like image 41
daniel Avatar answered Oct 22 '22 00:10

daniel


if it has to be CompletableFuture

private static String test2() {
  EventQueue.invokeLater(new Runnable() {
    @Override
    public void run() {
      try {
        CompletableFuture.runAsync( () -> IntStream.rangeClosed(1, 20).forEach(x -> {
          try {
            Thread.sleep(500);
          }
          catch (InterruptedException e) {
            e.printStackTrace();
          }
          System.err.println(x);
        })).get();
      }
      catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
      }
    }});
    return "Finish";
}

I think you would be better off with this alternative to CompletableFuture

private static String test2() {
    Runnable runner = new Runnable() {
      @Override
      public void run() {
          IntStream.rangeClosed(1, 20).forEach(x -> {
            try {
              Thread.sleep(500);
            }
            catch (InterruptedException e) {
              e.printStackTrace();
            }
            System.out.println(x);
          });
      }
  };
  Executors.newCachedThreadPool().execute(runner);
  return "Finish";
}
like image 39
Kaplan Avatar answered Oct 22 '22 00:10

Kaplan