Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Custom thread pool in Java 8 parallel stream

Is it possible to specify a custom thread pool for Java 8 parallel stream? I can not find it anywhere.

Imagine that I have a server application and I would like to use parallel streams. But the application is large and multi-threaded so I want to compartmentalize it. I do not want a slow running task in one module of the applicationblock tasks from another module.

If I can not use different thread pools for different modules, it means I can not safely use parallel streams in most of real world situations.

Try the following example. There are some CPU intensive tasks executed in separate threads. The tasks leverage parallel streams. The first task is broken, so each step takes 1 second (simulated by thread sleep). The issue is that other threads get stuck and wait for the broken task to finish. This is contrived example, but imagine a servlet app and someone submitting a long running task to the shared fork join pool.

public class ParallelTest {     public static void main(String[] args) throws InterruptedException {         ExecutorService es = Executors.newCachedThreadPool();          es.execute(() -> runTask(1000)); //incorrect task         es.execute(() -> runTask(0));         es.execute(() -> runTask(0));         es.execute(() -> runTask(0));         es.execute(() -> runTask(0));         es.execute(() -> runTask(0));           es.shutdown();         es.awaitTermination(60, TimeUnit.SECONDS);     }      private static void runTask(int delay) {         range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()                 .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));     }      public static boolean isPrime(long n) {         return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);     } } 
like image 563
Lukas Avatar asked Jan 16 '14 13:01

Lukas


People also ask

Does parallel stream use thread pool?

2. Parallel Stream. The default processing that occurs in such a Stream uses the ForkJoinPool. commonPool(), a thread pool shared by the entire application.

What is the thread pool used by Java parallel stream?

The parallel stream by default uses ForkJoinPool. commonPool which has one less thread than number of processor.

How do you control threads in a parallel stream?

The idea is to create a custom fork-join pool with a desirable number of threads and execute the parallel stream within it. This allows developers to control the threads that parallel stream uses. Additionally, it separates the parallel stream thread pool from the application pool which is considered a good practice.


1 Answers

There actually is a trick how to execute a parallel operation in a specific fork-join pool. If you execute it as a task in a fork-join pool, it stays there and does not use the common one.

final int parallelism = 4; ForkJoinPool forkJoinPool = null; try {     forkJoinPool = new ForkJoinPool(parallelism);     final List<Integer> primes = forkJoinPool.submit(() ->         // Parallel task here, for example         IntStream.range(1, 1_000_000).parallel()                 .filter(PrimesPrint::isPrime)                 .boxed().collect(Collectors.toList())     ).get();     System.out.println(primes); } catch (InterruptedException | ExecutionException e) {     throw new RuntimeException(e); } finally {     if (forkJoinPool != null) {         forkJoinPool.shutdown();     } } 

The trick is based on ForkJoinTask.fork which specifies: "Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool()"

like image 194
Lukas Avatar answered Sep 28 '22 11:09

Lukas