Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there a way to zip two streams?

This question arrose from the answer to another question where map and reduce were suggested to calculate a sum concurrently.

In that question there's a complexCalculation(e), but now I was wondering how to parallellise even more, by splitting the calculation in two parts, so that complexCalculation(e) = part1(e) * part2(e). I wonder whether it would be possible to calculate part1 and part2 on a collection concurrently (using map() again) and then zip the two resulting streams so that the ith element of both streams is combined with the function * so that the resulting stream equals the stream that can be gotten by mapping complexCalculation(e) on that collection. In code this would look like:

Stream map1 = bigCollection.parallelStream().map(e -> part1(e));
Stream map2 = bigCollection.parallelStream().map(e -> part2(e));
// preferably map1 and map2 are computed concurrently...
Stream result = map1.zip(map2, (e1, e2) -> e1 * e2);

result.equals(bigCollection.map(e -> complexCalculation(e))); //should be true

So my question is: does there exist some functionality like the zip function I tried to describe here?

like image 948
Mr Tsjolder Avatar asked Aug 21 '15 09:08

Mr Tsjolder


People also ask

How do I merge two streams?

concat() in Java. Stream. concat() method creates a concatenated stream in which the elements are all the elements of the first stream followed by all the elements of the second stream. The resulting stream is ordered if both of the input streams are ordered, and parallel if either of the input streams is parallel.

Can a stream be used multiple times?

From the documentation: A stream should be operated on (invoking an intermediate or terminal stream operation) only once. A stream implementation may throw IllegalStateException if it detects that the stream is being reused. So the answer is no, streams are not meant to be reused.

Is there a zip function in Java?

Even though there is no zip function in Java 8, we can use the map function to achieve the goal.

Why use Java streams?

Java streams enable functional-style operations on streams of elements. A stream is an abstraction of a non-mutable collection of functions applied in some order to the data. A stream is not a collection where you can store elements.


2 Answers

parallelStream() is guarenteed to complete in the order submitted. This means you cannot assume that two parallelStreams can be zipped together like this.

Your original bigCollection.map(e -> complexCalculation(e)) is likely to be faster unless your collection is actually smaller than the number of CPUs you have.

like image 156
Peter Lawrey Avatar answered Sep 20 '22 14:09

Peter Lawrey


If you really want to parallelize part1 and part2 (for example your bigCollection has very few elements, less than CPU cores), you can do the following trick. Suppose you have two methods part1 and part2 in the current class:

public long part1(Type t) { ... }
public long part2(Type t) { ... }

Create a stream of two functions created from these methods and process it in parallel like this:

bigCollection.parallelStream()
    .map(e -> Stream.<ToLongFunction<Type>>of(this::part1, this::part2)
            .parallel()
            .mapToLong(fn -> fn.applyAsLong(e)).reduce(1, (a, b) -> a*b))
    .// continue the outer stream operations

However it's very rare case. As @PeterLawrey noted if your outer collection is big enough, no need to parallelize part1 and part2. Instead you will handle separate elements in parallel.

like image 31
Tagir Valeev Avatar answered Sep 18 '22 14:09

Tagir Valeev