Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Default ForkJoinPool executor taking long time

I am working with the CompletableFuture for async execution of a stream generated from a list source.

so i am testing the overloaded method i.e. "supplyAsync" of CompletableFuture in which one method takes only single supplier parameter and other takes a supplier parameter and an executor parameter. Here is the documentation for both:

one

supplyAsync(Supplier supplier)

Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() with the value obtained by calling the given Supplier.

second

supplyAsync(Supplier supplier, Executor executor)

Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor with the value obtained by calling the given Supplier.

And here is my test class:

public class TestCompleteableAndParallelStream {

    public static void main(String[] args) {
        List<MyTask> tasks = IntStream.range(0, 10)
                .mapToObj(i -> new MyTask(1))
                .collect(Collectors.toList());
        
        useCompletableFuture(tasks);
        
        useCompletableFutureWithExecutor(tasks);

    }
    
    public static void useCompletableFutureWithExecutor(List<MyTask> tasks) {
          long start = System.nanoTime();
          ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
          List<CompletableFuture<Integer>> futures =
              tasks.stream()
                   .map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor))
                   .collect(Collectors.toList());
         
          List<Integer> result =
              futures.stream()
                     .map(CompletableFuture::join)
                     .collect(Collectors.toList());
          long duration = (System.nanoTime() - start) / 1_000_000;
          System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
          System.out.println(result);
          executor.shutdown();
        }
    
    public static void useCompletableFuture(List<MyTask> tasks) {
          long start = System.nanoTime();
          List<CompletableFuture<Integer>> futures =
              tasks.stream()
                   .map(t -> CompletableFuture.supplyAsync(() -> t.calculate()))
                   .collect(Collectors.toList());
         
          List<Integer> result =
              futures.stream()
                     .map(CompletableFuture::join)
                     .collect(Collectors.toList());
          long duration = (System.nanoTime() - start) / 1_000_000;
          System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
          System.out.println(result);
        }
    
    

}


class MyTask {
      private final int duration;
      public MyTask(int duration) {
        this.duration = duration;
      }
      public int calculate() {
        System.out.println(Thread.currentThread().getName());
        try {
          Thread.sleep(duration * 1000);
        } catch (final InterruptedException e) {
          throw new RuntimeException(e);
        }
        return duration;
      }
    }

while the "useCompletableFuture" method takes around 4 seconds to complete, "useCompletableFutureWithExecutor" method takes only 1 second to complete.

No my question is, What different processing does the ForkJoinPool.commonPool() which could do the overhead? In that shouldn't we always prefer the custom executor pool over ForkJoinPool?

like image 741
KayV Avatar asked Aug 02 '17 12:08

KayV


People also ask

How many threads does ForkJoinPool commonPool have?

(By default, the common pool allows a maximum of 256 spare threads.) Using a value (for example Integer. MAX_VALUE ) larger than the implementation's total thread limit has the same effect as using this limit (which is the default).

What is ForkJoinPool commonPool?

public static ForkJoinPool commonPool() Returns the common pool instance. This pool is statically constructed; its run state is unaffected by attempts to shutdown() or shutdownNow() . However this pool and any ongoing processing are automatically terminated upon program System.

What is difference between ExecutorService and ForkJoinPool?

The Fork/Join framework in Java 7 is an implementation of the Divide and Conquer algorithm, in which a central ForkJoinPool executes branching ForkJoinTasks. ExecutorService is an Executor that provides methods to manage the progress-tracking and termination of asynchronous tasks.

How is forkjoinpool different from ExecutorService?

A ForkJoinPool differs from other kinds of ExecutorService mainly by virtue of employing work-stealing: all threads in the pool attempt to find and execute tasks submitted to the pool and/or created by other active tasks (eventually blocking waiting for work if none exist).

What is the default number of threads in forkjoinpool?

The default number of threads in ForkJoinPool is equal to the number of CPUs on the system. If any threads go into waiting for state due to calling join () on some other ForkJoinTask a new compensatory thread is started to utilize all CPUs of the system.

How many tasks can forkjoinpool handle at once?

You manually set allocated three threads to your default ForkJoinPool, so your nine tasks are distributed equally between them: each thread executes three tasks consecutively. So, you have three seconds as the result.

What is the forkjoinpool used for?

ForkJoinPool is designed to be used for CPU-intensive workloads. The default number of threads in ForkJoinPool is equal to the number of CPUs on the system. If any threads go into waiting for state due to calling join () on some other ForkJoinTask a new compensatory thread is started to utilize all CPUs of the system.


2 Answers

Check ForkJoinPool.commonPool() size. By default it creates a pool with a size of

Runtime.getRuntime().availableProcessors() - 1

I run your example on my Intel i7-4800MQ (4 cores + 4 virtual cores) and the size of common pool in my case is 7, so the whole computation took ~2000 ms:

ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-6
ForkJoinPool.commonPool-worker-5
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-7
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-1
Processed 10 tasks in 2005 millis
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

In second case you used

Executors.newFixedThreadPool(Math.min(tasks.size(), 10));

so the pool has 10 threads ready to perform calculation, so all tasks are run in ~1000 ms:

pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
pool-1-thread-5
pool-1-thread-6
pool-1-thread-7
pool-1-thread-8
pool-1-thread-9
pool-1-thread-10
Processed 10 tasks in 1002 millis
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

Difference between ForkJoinPool and ExecutorService

Eugene in his comment mentioned also one more important thing. ForkJoinPool uses work-stealing approach:

A ForkJoinPool differs from other kinds of ExecutorService mainly by virtue of employing work-stealing: all threads in the pool attempt to find and execute tasks submitted to the pool and/or created by other active tasks (eventually blocking waiting for work if none exist). This enables efficient processing when most tasks spawn other subtasks (as do most ForkJoinTasks), as well as when many small tasks are submitted to the pool from external clients. Especially when setting asyncMode to true in constructors, ForkJoinPools may also be appropriate for use with event-style tasks that are never joined.

while ExecutorService created with .newFixedThreadPool() uses divide and conquer approach.

How to determine pool size?

There was a question about what is the best thread pool size, you may find useful information there:

Setting Ideal size of Thread Pool

Also this thread is a good place to investigate:

Custom thread pool in Java 8 parallel stream

like image 169
Szymon Stepniak Avatar answered Oct 16 '22 22:10

Szymon Stepniak


Further checking the solutions on internet, i found that we can change the default pool size which the ForkJoinPool takes using the following properties:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=16

So, this property can further help out in making the ForkJoinPool to be utilised with more efficient manner and with more parallelism.

like image 6
KayV Avatar answered Oct 16 '22 23:10

KayV