Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

CompletableFuture is not getting executed. If I use the ExecutorService pool its work as expected but not with the default forkJoin common pool

I am trying to run the following class its getting terminated without executing the CompletableFuture.

public class ThenApplyExample {

public static void main(String[] args) throws Exception {
    //ExecutorService es = Executors.newCachedThreadPool();
    CompletableFuture<Student> studentCompletableFuture = CompletableFuture.supplyAsync(() -> {

        try {

            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 3;
    })// If I put executorservice created n commented above, programme work as expected.
            .thenApply(i -> {

                for (int j = 0; j <= i; j++) {
                    System.out.println("Inside first then apply");
                }
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("First then apply is finished");
                return ++i;
            })
            .thenApply(i -> {
                System.out.println("Inside 2nd then apply");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Inside 2nd then apply stopped");

                return i++;
            })
            .thenApply(i -> {
                System.out.println("Inside 3nd then apply");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Inside 3nd then apply stopped");
                return "The i is ::: " + i;
            })
            .thenApply(s -> Student.builder().id(1).name(s).address("Some address").build());
    System.out.println("Executing..");
    System.out.println("Executing..");
    System.out.println("Executing..");
    System.out.println("Executing..");
    System.out.println("Executing..");

    //es.shutdown();
}
} 

Output I am getting is

Executing..
Executing..
Executing..
Executing..
Executing..

Whereas expected output is

Executing..
Executing..
Executing..
Executing..
Executing..
Inside first then apply
Inside first then apply
Inside first then apply
Inside first then apply
First then apply is finished
Inside 2nd then apply
Inside 2nd then apply stopped
Inside 3nd then apply
Inside 3nd then apply stopped

Note : In the above programme, I am not using studentCompletableFuture.get(). I dont want to use it since it blocks the code.

If I add studentCompletableFuture.get() at the very end of the programme, it works as expected or if I add the executorservice in the supplyAsync 2nd argument(check comment in programme), it works again as expected.

My question is why it is terminating when programme uses the default ForkJoin common pool?

like image 691
Piyush Kumar Avatar asked Aug 16 '18 14:08

Piyush Kumar


People also ask

Is CompletableFuture blocked?

The CompletableFuture. get() method is blocking. It waits until the Future is completed and returns the result after its completion.

Does CompletableFuture throw exception?

The CompletableFuture. join() method is similar to the get method, but it throws an unchecked exception in case the Future does not complete normally.

Does CompletableFuture get block main thread?

It just provides a get() method which blocks until the result is available to the main thread. Ultimately, it restricts users from applying any further action on the result. You can create an asynchronous workflow with CompletableFuture.

How does CompletableFuture work in Java?

What is CompletableFuture? A CompltableFuture is used for asynchronous programming. Asynchronous programming means writing non-blocking code. It runs a task on a separate thread than the main application thread and notifies the main thread about its progress, completion or failure.


1 Answers

TL;DR: The ForkJoinPool uses daemon threads, whereas the ExecutorService is using non-daemon threads. The latter keep the JVM alive; the former do not. Also, the main thread is a non-daemon thread and when you block it waiting for the CompletableFuture to complete it remains alive (thus keeping the JVM alive).


Daemon vs Non-Daemon Threads

A Thread in Java can either be a daemon thread or a non-daemon thread. A daemon thread does not keep the JVM alive. This behavior is documented:

When a Java Virtual Machine starts up, there is usually a single non-daemon thread (which typically calls the method named main of some designated class). The Java Virtual Machine continues to execute threads until either of the following occurs [emphasis added]:

  • The exit method of class Runtime has been called and the security manager has permitted the exit operation to take place.
  • All threads that are not daemon threads have died [emphasis added], either by returning from the call to the run method or by throwing an exception that propagates beyond the run method.

In other words, it doesn't matter how many daemon threads are alive or what they're doing—if there are no non-daemon threads alive then the JVM will exit.

"Main" Thread

As noted in the above documentation, there is typically a single non-daemon thread when a JVM starts. And this thread is usually the one that invokes the main method. Unless other non-daemon threads are started (and stay alive) the JVM will exit once the main thread terminates.

ForkJoinPool

A ForkJoinPool uses daemon threads, at least by default. This behavior is also documented:

All worker threads are initialized with Thread.isDaemon() set true.

– Last sentence, second paragraph of class Javadoc

This means work submitted to the ForkJoinPool will not keep the JVM alive.

ExecutorService

Most of the ExecutorService instances returned by the factory methods in Executors are configured to use non-daemon threads by default. Unfortunately this default behavior does not seem to be documented. If you want the pool to use daemon threads, however, then you can supply a ThreadFactory.

An exception to this default behavior are the #newWorkStealingPool(...) methods. They return a ForkJoinPool (an implementation detail).


The Behavior of Your Code

The differences in behavior between the different versions of your code can be explained by the use of non-daemon threads versus daemon threads.

No Waiting for Task to Complete

Your original code looks like this (vastly simplified):

import java.util.concurrent.CompletableFuture;

public class Main {

  public static void main(String[] args) {
    CompletableFuture.runAsync(
        () -> {
          System.out.println("Sleeping...");
          Thread.sleep(2000L); // try-catch omitted for brevity
          System.out.println("Done!");
        });
  }
}

That code is launching an asynchronous task via CompletableFuture#runAsync(Runnable), which:

Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() after it runs the given action.

As you can see the task is passed to the common ForkJoinPool. That means the task is being executed by a daemon thread. You also do not wait for the task to complete. The runAsync call submits the task and returns immediately. Then the main thread simply exits the main method and terminates. Since the only non-daemon thread has terminated the JVM also exits—before the asynchronous task had time to complete.

Waiting for Task to Complete

When you modify your code to wait on the future:

import java.util.concurrent.CompletableFuture;

public class Main {

  public static void main(String[] args) throws Exception {
    CompletableFuture.runAsync(
            () -> {
              System.out.println("Sleeping...");
              Thread.sleep(2000L); // try-catch omitted for brevity
              System.out.println("Done!");
            })
        .get(); // wait for future to complete
  }
}

You are now blocking the main thread in the get() call. Said thread remains blocked until it's interrupted or the task completes (normally or exceptionally). This means a non-daemon thread remains alive until the task completes, thus the JVM stays alive.

Using Custom ExecutorService

Modifying the original code again, this time to use a custom ExecutorService:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {

  public static void main(String[] args) {
    ExecutorService executor = Executors.newCachedThreadPool();
    CompletableFuture.runAsync(
        () -> {
          System.out.println("Sleeping...");
          Thread.sleep(2000L); // try-catch omitted for brevity
          System.out.println("Done!");
        },
        executor); // use custom ExecutorService
    executor.shutdown();
  }
}

Now instead of using the common ForkJoinPool the task is submitted to the given ExecutorService. In this case that thread pool is using non-daemon threads. And that means those threads will keep the JVM alive. This is true even though the main thread is allowed to exit the main method and terminate before the task completes.

The fact non-daemon threads are being used is why it's important to call #shutdown(). Without that the threads are allowed to persist and keep the JVM alive "indefinitely". Though it's possible a "cached thread pool", specifically, may allow all threads to eventually die from being idle too long.

Note calling #shutdown() still allows all already-submitted tasks to complete.


Addressing Comments

In one of your comments you ask:

Is there any other elegant way to make this run while using ForkJoinPool common pool threads and not using studentCompletableFuture.get().

I'm not sure what you would consider more "elegant", but you could use the #join() method instead. That method behaves very similarly to #get() without throwing checked exceptions. But be warned: A call to #join() cannot be interrupted. There's also no timeout overload (though you can combine it with orTimeout / completeOnTimeout in Java 9+).

In another one of your comments you mention:

I just checked ForkJoinPool common threads are not daemon thread by using System.out.println("The thread is :: "+ Thread.currentThread().getName() + Thread.currentThread().isDaemon());

I don't know why or how you're seeing that, but the following:

import java.util.concurrent.CompletableFuture;

public class Main {

  public static void main(String[] args) {
    CompletableFuture.runAsync(
            () -> {
              Thread t = Thread.currentThread();
              System.out.printf("Thread[name=%s, daemon=%s]%n", t.getName(), t.isDaemon());
            })
        .join();
  }
}

Gives this output:

Thread[name=ForkJoinPool.commonPool-worker-3, daemon=true]
like image 65
Slaw Avatar answered Sep 21 '22 09:09

Slaw