I had searched through various articles online and Stack Overflow questions but I am not able to find the perfect answer for this. Many questions are there which are close to this, but are slightly different.
We know Java 8 Streams API uses Fork-Join Pool internally.
Now my question is how the tasks in stream pipeline is divided using Fork-Join pool?
Suppose we have the following:
List myList = inputList.parallelStream().filter( x -> x>0 )
.map(x -> x+100 ).collect(Collectors.toList());
Now we have two options for dividing the tasks using thread pools.
filter
and map
as a single task and run it using fork-join pool.filter
and map
as a two different tasks and run these using two different fork-join thread-pools.Also I know that stream are lazily propagated so if we have a stateful intermediate operation in between as:
List myList2 = inputList.parallelStream().filter( x -> x>0 )
.map(x -> x+5 ).sorted().map(x -> x+5 ).collect(Collectors.toList());
Then how will the thread-pools be created?
PS: I know the map function can be combined before. I just wanted to make an example for the question.
First of all you have to use parallel
for the Fork-Join Pool
to be active. This answer explains a bit what Spliterator
s are how the splitting is performed; but in simple words splitting is done using the source of stream elements and an entire pipeline is processed in parallel. In your example it's filter
and map
as you put it (of course it includes the terminal
operation also).
For stateful operations - things are more complicated. Let's take distinct
for example and first see how it handles things for the sequential case.
In general you could think that a non-parallel
distinct
could be implemented using HashSet
- and you would be correct. HashSet
could hold all the values that have been already seen and simply not process (send to the next operations) the other elements - and theoretically you would be done with a non parallel distinct
operation. But what if the Stream
is known to be SORTED
? Think about it, it means that we could keep a single element (as opposed to a HashSet
as before) that would be marked as seen
. Basically if you would have:
1,1,2,2,3
it would mean that your stateful operation could be implemented on top of a single element - not a HashSet
; the code would be something like:
T seen = null;
....
if(seen == null || (!currentElement.equals(seen)){
seen = currentElement;
// process seen;
}
But this optimization is only possible when you know that the stream is SORTED
, since this way you know that the next element coming is either the same as the one you have already seen or a new one, that is impossible for you to have seen before in some other previous operation - this is guaranteed by a sort operation.
And now how is a parallel distinct
implemented. You basically ask this question:
Then how will the thread-pools be created
The same way, nothing changes from a Stream perspective, ForkJoinPool
uses the same number of threads - the only thing that changes is the stream implementation, obviously.
In simple words, if your Stream
is ORDERED
the internal implementation uses a LinkedHashSet
(actually multiple instances of this, since it really does a reduction in such a case) to preserve your order and it uses a ConcurrentHashMap
if you don't care about order - that is either if the source is not ordered (like a Set
) or you used explicit called unordered
. You can look up the implementation for sorted
too, if you really want to know how it's done.
So the bottom line is that a Fork Join Pool
does not change the implementation based on the stream, it uses the same model. On the other hand based on the operations that you have, the Stream API might use some stateful data for stateful intermediate operations, be that a HashSet/ConcurrentHashMap
, or a single element, etc.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With