Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Fork-Join Pool in parallel streams

Tags:

I had searched through various articles online and Stack Overflow questions but I am not able to find the perfect answer for this. Many questions are there which are close to this, but are slightly different.

We know Java 8 Streams API uses Fork-Join Pool internally.

Now my question is how the tasks in stream pipeline is divided using Fork-Join pool?

Suppose we have the following:

List myList =  inputList.parallelStream().filter( x -> x>0 )
    .map(x -> x+100 ).collect(Collectors.toList());

Now we have two options for dividing the tasks using thread pools.

  1. Take filter and map as a single task and run it using fork-join pool.
  2. Take filter and map as a two different tasks and run these using two different fork-join thread-pools.

Also I know that stream are lazily propagated so if we have a stateful intermediate operation in between as:

List myList2 = inputList.parallelStream().filter( x -> x>0 )
    .map(x -> x+5 ).sorted().map(x -> x+5 ).collect(Collectors.toList());

Then how will the thread-pools be created?

PS: I know the map function can be combined before. I just wanted to make an example for the question.

like image 455
Rishabh Agarwal Avatar asked Oct 21 '18 12:10

Rishabh Agarwal


1 Answers

First of all you have to use parallel for the Fork-Join Pool to be active. This answer explains a bit what Spliterators are how the splitting is performed; but in simple words splitting is done using the source of stream elements and an entire pipeline is processed in parallel. In your example it's filter and map as you put it (of course it includes the terminal operation also).

For stateful operations - things are more complicated. Let's take distinct for example and first see how it handles things for the sequential case.

In general you could think that a non-parallel distinct could be implemented using HashSet - and you would be correct. HashSet could hold all the values that have been already seen and simply not process (send to the next operations) the other elements - and theoretically you would be done with a non parallel distinct operation. But what if the Stream is known to be SORTED? Think about it, it means that we could keep a single element (as opposed to a HashSet as before) that would be marked as seen. Basically if you would have:

 1,1,2,2,3

it would mean that your stateful operation could be implemented on top of a single element - not a HashSet; the code would be something like:

T seen = null;
....
if(seen == null || (!currentElement.equals(seen)){
    seen = currentElement;
    // process seen;
}

But this optimization is only possible when you know that the stream is SORTED, since this way you know that the next element coming is either the same as the one you have already seen or a new one, that is impossible for you to have seen before in some other previous operation - this is guaranteed by a sort operation.

And now how is a parallel distinct implemented. You basically ask this question:

Then how will the thread-pools be created

The same way, nothing changes from a Stream perspective, ForkJoinPool uses the same number of threads - the only thing that changes is the stream implementation, obviously.

In simple words, if your Stream is ORDERED the internal implementation uses a LinkedHashSet (actually multiple instances of this, since it really does a reduction in such a case) to preserve your order and it uses a ConcurrentHashMap if you don't care about order - that is either if the source is not ordered (like a Set) or you used explicit called unordered. You can look up the implementation for sorted too, if you really want to know how it's done.


So the bottom line is that a Fork Join Pool does not change the implementation based on the stream, it uses the same model. On the other hand based on the operations that you have, the Stream API might use some stateful data for stateful intermediate operations, be that a HashSet/ConcurrentHashMap, or a single element, etc.

like image 58
Eugene Avatar answered Nov 15 '22 05:11

Eugene