Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Make Stream Parallel on the Result of flatMap

Consider the following simple code:

Stream.of(1)
  .flatMap(x -> IntStream.range(0, 1024).boxed())
  .parallel() // Moving this before flatMap has the same effect because it's just a property of the entire stream
  .forEach(x -> {
     System.out.println("Thread: " + Thread.currentThread().getName());
  });

For a long time, I thought that Java would have parallel execution for elements even after flatMap. But the above code prints all "Thread: main", which proves my thought wrong.

A simple way to make it parallel after flatMap would be to collect and then stream again:

Stream.of(1)
  .flatMap(x -> IntStream.range(0, 1024).boxed())
  .parallel() // Moving this before flatMap has the same effect because it's just a property of the entire stream
  .collect(Collectors.toList())
  .parallelStream()
  .forEach(x -> {
     System.out.println("Thread: " + Thread.currentThread().getName());
  });

I was wondering whether there is a better way, and about the design choice of flatMap that only parallelizes the stream before the call, but not after the call.

========= More Clarification about the Question ========

From some answers, it seems that my question is not fully conveyed. As @Andreas said, if I start with a Stream of 3 elements, there could be 3 threads running.

But my question really is: Java Stream uses a common ForkJoinPool that has a default size equal to one less than the number of cores, according to this post. Now suppose I have 64 cores, then I expect my above code would see many different threads after flatMap, but in fact, it sees only one (or 3 in Andreas' case). By the way, I did use isParallel to observe that the stream is parallel.

To be honest, I wasn't asking this question for pure academic interest. I ran into this problem in a project that presents a long chain of stream operations for transforming a dataset. The chain starts with a single file, and explodes to a lot of elements through flatMap. But apparently, in my experiment, it does NOT fully exploit my machine (which has 64 cores), but only uses one core (from observation of the cpu usage).

like image 904
zico Avatar asked Nov 06 '22 05:11

zico


1 Answers

I was wondering [...] about the design choice of flatMap that only parallelizes the stream before the call, but not after the call.

You're mistaken. All steps both before and after the flatMap are run in parallel, but it only splits the original stream between threads. The flatMap operation is then handled by one such thread, and its stream isn't split.

Since your original stream only has 1 element, it cannot be split, and hence parallel has no effect.

Try changing to Stream.of(1, 2, 3), and you will see that the forEach, which is after the flatMap, is actually run in 3 different threads.

like image 110
Andreas Avatar answered Nov 09 '22 14:11

Andreas