I am working with the CompletableFuture for async execution of a stream generated from a list source.
so i am testing the overloaded method i.e. "supplyAsync" of CompletableFuture in which one method takes only single supplier parameter and other takes a supplier parameter and an executor parameter. Here is the documentation for both:
one
supplyAsync(Supplier supplier)
Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() with the value obtained by calling the given Supplier.
second
supplyAsync(Supplier supplier, Executor executor)
Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor with the value obtained by calling the given Supplier.
And here is my test class:
public class TestCompleteableAndParallelStream {
public static void main(String[] args) {
List<MyTask> tasks = IntStream.range(0, 10)
.mapToObj(i -> new MyTask(1))
.collect(Collectors.toList());
useCompletableFuture(tasks);
useCompletableFutureWithExecutor(tasks);
}
public static void useCompletableFutureWithExecutor(List<MyTask> tasks) {
long start = System.nanoTime();
ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
List<CompletableFuture<Integer>> futures =
tasks.stream()
.map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor))
.collect(Collectors.toList());
List<Integer> result =
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
System.out.println(result);
executor.shutdown();
}
public static void useCompletableFuture(List<MyTask> tasks) {
long start = System.nanoTime();
List<CompletableFuture<Integer>> futures =
tasks.stream()
.map(t -> CompletableFuture.supplyAsync(() -> t.calculate()))
.collect(Collectors.toList());
List<Integer> result =
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
System.out.println(result);
}
}
class MyTask {
private final int duration;
public MyTask(int duration) {
this.duration = duration;
}
public int calculate() {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(duration * 1000);
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
return duration;
}
}
while the "useCompletableFuture" method takes around 4 seconds to complete, "useCompletableFutureWithExecutor" method takes only 1 second to complete.
No my question is, What different processing does the ForkJoinPool.commonPool() which could do the overhead? In that shouldn't we always prefer the custom executor pool over ForkJoinPool?
(By default, the common pool allows a maximum of 256 spare threads.) Using a value (for example Integer. MAX_VALUE ) larger than the implementation's total thread limit has the same effect as using this limit (which is the default).
public static ForkJoinPool commonPool() Returns the common pool instance. This pool is statically constructed; its run state is unaffected by attempts to shutdown() or shutdownNow() . However this pool and any ongoing processing are automatically terminated upon program System.
The Fork/Join framework in Java 7 is an implementation of the Divide and Conquer algorithm, in which a central ForkJoinPool executes branching ForkJoinTasks. ExecutorService is an Executor that provides methods to manage the progress-tracking and termination of asynchronous tasks.
A ForkJoinPool differs from other kinds of ExecutorService mainly by virtue of employing work-stealing: all threads in the pool attempt to find and execute tasks submitted to the pool and/or created by other active tasks (eventually blocking waiting for work if none exist).
The default number of threads in ForkJoinPool is equal to the number of CPUs on the system. If any threads go into waiting for state due to calling join () on some other ForkJoinTask a new compensatory thread is started to utilize all CPUs of the system.
You manually set allocated three threads to your default ForkJoinPool, so your nine tasks are distributed equally between them: each thread executes three tasks consecutively. So, you have three seconds as the result.
ForkJoinPool is designed to be used for CPU-intensive workloads. The default number of threads in ForkJoinPool is equal to the number of CPUs on the system. If any threads go into waiting for state due to calling join () on some other ForkJoinTask a new compensatory thread is started to utilize all CPUs of the system.
Check ForkJoinPool.commonPool()
size. By default it creates a pool with a size of
Runtime.getRuntime().availableProcessors() - 1
I run your example on my Intel i7-4800MQ (4 cores + 4 virtual cores) and the size of common pool in my case is 7
, so the whole computation took ~2000 ms:
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-6
ForkJoinPool.commonPool-worker-5
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-7
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-1
Processed 10 tasks in 2005 millis
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
In second case you used
Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
so the pool has 10 threads ready to perform calculation, so all tasks are run in ~1000 ms:
pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
pool-1-thread-5
pool-1-thread-6
pool-1-thread-7
pool-1-thread-8
pool-1-thread-9
pool-1-thread-10
Processed 10 tasks in 1002 millis
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
ForkJoinPool
and ExecutorService
Eugene in his comment mentioned also one more important thing. ForkJoinPool
uses work-stealing approach:
A ForkJoinPool differs from other kinds of ExecutorService mainly by virtue of employing work-stealing: all threads in the pool attempt to find and execute tasks submitted to the pool and/or created by other active tasks (eventually blocking waiting for work if none exist). This enables efficient processing when most tasks spawn other subtasks (as do most ForkJoinTasks), as well as when many small tasks are submitted to the pool from external clients. Especially when setting asyncMode to true in constructors, ForkJoinPools may also be appropriate for use with event-style tasks that are never joined.
while ExecutorService
created with .newFixedThreadPool()
uses divide and conquer approach.
There was a question about what is the best thread pool size, you may find useful information there:
Setting Ideal size of Thread Pool
Also this thread is a good place to investigate:
Custom thread pool in Java 8 parallel stream
Further checking the solutions on internet, i found that we can change the default pool size which the ForkJoinPool takes using the following properties:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=16
So, this property can further help out in making the ForkJoinPool to be utilised with more efficient manner and with more parallelism.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With