This question arrose from the answer to another question where map and reduce were suggested to calculate a sum concurrently.
In that question there's a complexCalculation(e)
, but now I was wondering how to parallellise even more, by splitting the calculation in two parts, so that complexCalculation(e)
= part1(e)
* part2(e)
. I wonder whether it would be possible to calculate part1 and part2 on a collection concurrently (using map()
again) and then zip the two resulting streams so that the ith element of both streams is combined with the function * so that the resulting stream equals the stream that can be gotten by mapping complexCalculation(e)
on that collection. In code this would look like:
Stream map1 = bigCollection.parallelStream().map(e -> part1(e));
Stream map2 = bigCollection.parallelStream().map(e -> part2(e));
// preferably map1 and map2 are computed concurrently...
Stream result = map1.zip(map2, (e1, e2) -> e1 * e2);
result.equals(bigCollection.map(e -> complexCalculation(e))); //should be true
So my question is: does there exist some functionality like the zip
function I tried to describe here?
concat() in Java. Stream. concat() method creates a concatenated stream in which the elements are all the elements of the first stream followed by all the elements of the second stream. The resulting stream is ordered if both of the input streams are ordered, and parallel if either of the input streams is parallel.
From the documentation: A stream should be operated on (invoking an intermediate or terminal stream operation) only once. A stream implementation may throw IllegalStateException if it detects that the stream is being reused. So the answer is no, streams are not meant to be reused.
Even though there is no zip function in Java 8, we can use the map function to achieve the goal.
Java streams enable functional-style operations on streams of elements. A stream is an abstraction of a non-mutable collection of functions applied in some order to the data. A stream is not a collection where you can store elements.
parallelStream() is guarenteed to complete in the order submitted. This means you cannot assume that two parallelStreams can be zipped together like this.
Your original bigCollection.map(e -> complexCalculation(e))
is likely to be faster unless your collection is actually smaller than the number of CPUs you have.
If you really want to parallelize part1
and part2
(for example your bigCollection has very few elements, less than CPU cores), you can do the following trick. Suppose you have two methods part1
and part2
in the current class:
public long part1(Type t) { ... }
public long part2(Type t) { ... }
Create a stream of two functions created from these methods and process it in parallel like this:
bigCollection.parallelStream()
.map(e -> Stream.<ToLongFunction<Type>>of(this::part1, this::part2)
.parallel()
.mapToLong(fn -> fn.applyAsLong(e)).reduce(1, (a, b) -> a*b))
.// continue the outer stream operations
However it's very rare case. As @PeterLawrey noted if your outer collection is big enough, no need to parallelize part1
and part2
. Instead you will handle separate elements in parallel.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With