Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Is there anything wrong with using I/O + ManagedBlocker in Java8 parallelStream()?

The default "paralellStream()" in Java 8 uses the common ForkJoinPool which may be a latency problem if the common Pool threads are exhausted when a task is submitted. However in many cases enough CPU power is available and the tasks are short enough so that this is not a problem. If we do have some long running tasks this will of course need some careful consideration, but for this question let's assume that this is not the problem.

However filling the ForkJoinPool with I/O tasks that don't actually do any CPU-bound work is a way to introduce a bottleneck even though enough CPU power is available. I understood that. However that is what we have the ManagedBlocker for. So if we have an I/O task we should simply allow the ForkJoinPool to manage that within a ManagedBlocker. That sounds incredibly easy. However to my surprise using a ManagedBlocker is rather complicated API for the simple thing that it is. And after all I think that this is a common problem. So I just built a simple utility method that makes ManagedBlockers easy to use for the common case:

public class BlockingTasks {      public static<T> T callInManagedBlock(final Supplier<T> supplier) {         final SupplierManagedBlock<T> managedBlock = new SupplierManagedBlock<>(supplier);         try {             ForkJoinPool.managedBlock(managedBlock);         } catch (InterruptedException e) {             throw new Error(e);         }         return managedBlock.getResult();     }      private static class SupplierManagedBlock<T> implements ForkJoinPool.ManagedBlocker {         private final Supplier<T> supplier;         private T result;         private boolean done = false;          private SupplierManagedBlock(final Supplier<T> supplier) {             this.supplier = supplier;         }          @Override         public boolean block() {             result = supplier.get();             done = true;             return true;         }          @Override         public boolean isReleasable() {             return done;         }          public T getResult() {             return result;         }     } } 

Now if I want to download the html code of a couple of websites in paralell I could to it like this without the I/O causing any trouble:

public static void main(String[] args) {     final List<String> pagesHtml = Stream         .of("https://google.com", "https://stackoverflow.com", "...")         .map((url) -> BlockingTasks.callInManagedBlock(() -> download(url)))         .collect(Collectors.toList()); } 

I am a little bit surprised that there is no class like the BlockingTasks above shipped with Java (or I did not find it?), but it was not that hard to build.

When I google for "java 8 parallel stream" I get in the first four results those articles that claim that due to the I/O problem Fork/Join sucks in Java:

  • https://dzone.com/articles/think-twice-using-java-8
  • http://zeroturnaround.com/rebellabs/java-parallel-streams-are-bad-for-your-health/ (at least mentions ManagedBlocker but also says "in a different use case you’d be able to give it a ManagedBlocker instance". It does not mention why not in this case.

I have altered my search terms somewhat and while there a lot of people complaining about how horrible life is I found nobody talking about a solution like the above. Since I don't feel like Marvin (brain like a planet) and Java 8 is available for quite a while I suspect that there is something terribly wrong with what I am proposing up there.

I banged together a small test:

public static void main(String[] args) {     System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Start");     IntStream.range(0, 10).parallel().forEach((x) -> sleep());     System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": End"); }  public static void sleep() {     try {         System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Sleeping " + Thread.currentThread().getName());         Thread.sleep(10000);     } catch (InterruptedException e) {         throw new Error(e);     } } 

I ran that an got the following result:

18:41:29.021: Start 18:41:29.033: Sleeping main 18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-1 18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-2 18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-5 18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-4 18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-6 18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-3 18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-7 18:41:39.034: Sleeping main 18:41:39.034: Sleeping ForkJoinPool.commonPool-worker-1 18:41:49.035: End 

So on my 8 CPU computer the ForkJoinPool naturally choose 8 threads, completed the first 8 tasks and finally the last two tasks which means that this took 20 seconds and if there were other tasks queued the pool could still have not used the clearly idle CPUs (except for 6 cores in the last 10 seconds).

Then I used...

IntStream.range(0, 10).parallel().forEach((x) -> callInManagedBlock(() -> { sleep(); return null; })); 

...instead of...

IntStream.range(0, 10).parallel().forEach((x) -> sleep()); 

...and got the following result:

18:44:10.93: Start 18:44:10.945: Sleeping main 18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-7 18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-1 18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-6 18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-3 18:44:10.955: Sleeping ForkJoinPool.commonPool-worker-2 18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-4 18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-5 18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-0 18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-11 18:44:20.957: End 

It looks to me like this works, extra threads were started to compensate my mock "blocking I/O action" (sleep). Time was cut down to 10 seconds and I suppose that if I'd queue more tasks that those could still use the available CPU power.

Is there anything wrong with this solution or in general using I/O in streams if the I/O operation is wrapped in a ManagedBlock?

like image 382
yankee Avatar asked May 29 '16 17:05


People also ask

When we should not use parallel stream?

Similarly, don't use parallel if the stream is ordered and has much more elements than you want to process, e.g. This may run much longer because the parallel threads may work on plenty of number ranges instead of the crucial one 0-100, causing this to take very long time.

Is it good to use parallel stream?

parallelStream() works parallelly on multiple threads. If we run this code multiple times then we can also see that each time we are getting a different order as output but this parallel stream boosts the performance so the situation where the order is not important is the best technique to use.

What is the disadvantage of parallel stream in Java 8?

1. Parallel Streams can actually slow you down. Java 8 brings the promise of parallelism as one of the most anticipated new features.

How many threads will be used when we use parallelStream ()?

In case of Parallel stream,4 threads are spawned simultaneously and it internally using Fork and Join pool to create and manage threads.

1 Answers

In short, yes, there are some problems with your solution. It definitely improves using blocking code inside parallel stream, and some third-party libraries provide similar solution (see, for example, Blocking class in jOOλ library). However this solution does not change the internal splitting strategy used in Stream API. The number of subtasks created by Stream API is controlled by the predefined constant in AbstractTask class:

/**  * Default target factor of leaf tasks for parallel decomposition.  * To allow load balancing, we over-partition, currently to approximately  * four tasks per processor, which enables others to help out  * if leaf tasks are uneven or some processors are otherwise busy.  */ static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2; 

As you can see it's four times bigger than common pool parallelism (which is by default number of CPU cores). The real splitting algorithm is a little bit more tricky, but roughly you cannot have more than 4x-8x tasks even if all of them are blocking.

For example, if you have 8 CPU cores, your Thread.sleep() test will work nicely up to IntStream.range(0, 32) (as 32 = 8*4). However for IntStream.range(0, 64) you will have 32 parallel tasks each processing two input numbers, so the whole processing would take 20 seconds, not 10.

like image 178
Tagir Valeev Avatar answered Oct 04 '22 05:10

Tagir Valeev