Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java parallelStream() with custom pool with caller work stealing?

Normally when one uses Java 8's parallelStream(), the result is execution via the default, common fork-join pool (i.e. ForkJoinPool.commonPool()).

That is clearly undesirable, however, if one has work that is far from CPU bound, e.g. may be waiting on IO much of the time. In such cases one will want to use a separate pool, sized according to other criteria (e.g. how much of the time the tasks are likely to be actually using the CPU).

There's no obvious means of getting parallelStream() to use a different pool, but there is a way as detailed here.

Unfortunately, that approach entails invoking the terminal operation on the parallel stream from a fork-join pool thread. The downside of this is that if the target-fork join pool is completely busy with existing work, the whole execution will wait on it while doing absolutely nothing. Thus the pool can become a bottleneck worse than single threaded execution. By contrast, when one uses parallelStream() in the "normal" fashion, ForkJoinPool.common.externalHelpComplete() or ForkJoinPool.common.tryExternalUnpush() are used to let the calling thread from outside the pool help in the processing.

Does anyone know of a way to both get parallelStream() to use a non-default fork-join pool and have a calling thread from outside the fork-join pool help in the processing of this work (but not the rest of the fork-join pool's work)?

like image 442
Jess Holle Avatar asked Oct 01 '15 15:10

Jess Holle


1 Answers

You can use awaitQuiescence on the pool to help out. However, you can’t select which task(s) you will help, it will just take the next pending from the pool, thus, if there are more pending tasks, you might ending up executing these before getting to your own.

ForkJoinPool forkJoinPool = new ForkJoinPool(1);
// make all threads busy:
forkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE));
// submit our task (may contain your stream operation)
ForkJoinTask<Thread> task = forkJoinPool.submit(() -> Thread.currentThread());
// help out
while(!task.isDone()) // use zero timeout to execute one task only
    forkJoinPool.awaitQuiescence(0, TimeUnit.NANOSECONDS);
System.out.println(Thread.currentThread()==task.get());

will print true.

whereas

ForkJoinPool forkJoinPool = new ForkJoinPool(1);
// make all threads busy:
forkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE));
// overload:
forkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE));
// submit our task (may contain your stream operation)
ForkJoinTask<Thread> task = forkJoinPool.submit(() -> Thread.currentThread());
// help out
while(!task.isDone())
    forkJoinPool.awaitQuiescence(0, TimeUnit.NANOSECONDS);
System.out.println(Thread.currentThread()==task.get());

will hang forever as it attempts to execute the second blocking task.

Nevertheless, it will let the initiating thread help processing the pool’s pending tasks which will raise the chance of its own task getting executed as long as there are no infinite tasks (the example above is extreme and only chosen for demonstration).


But note that the entire relationship between the Fork/Join framework and the Stream API is an implementation detail anyway.

like image 163
Holger Avatar answered Oct 22 '22 21:10

Holger