Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is a combiner needed for reduce method that converts type in java 8

People also ask

What is combiner In reduce?

In parallel processing we can pass combiner function as additional parameter to this method. Stream reduce() can be used to get the sum of numbers stored in collection. It can also concatenate the string data stored in collection with given separator.

What is a combiner in Java?

A Combiner, also known as a semi-reducer, is an optional class that operates by accepting the inputs from the Map class and thereafter passing the output key-value pairs to the Reducer class. The main function of a Combiner is to summarize the map output records with the same key.

What is reduce function in Java 8?

Java Stream reduction A reduction is a terminal operation that aggregates a stream into a type or a primitive. The Java 8 Stream API contains a set of predefined reduction operations, such as average , sum , min , max , and count , which return one value by combining the elements of a stream.

How does reduce work in Java stream?

Reducing is the repeated process of combining all elements. reduce operation applies a binary operator to each element in the stream where the first argument to the operator is the return value of the previous application and second argument is the current stream element.


Eran's answer described the differences between the two-arg and three-arg versions of reduce in that the former reduces Stream<T> to T whereas the latter reduces Stream<T> to U. However, it didn't actually explain the need for the additional combiner function when reducing Stream<T> to U.

One of the design principles of the Streams API is that the API shouldn't differ between sequential and parallel streams, or put another way, a particular API shouldn't prevent a stream from running correctly either sequentially or in parallel. If your lambdas have the right properties (associative, non-interfering, etc.) a stream run sequentially or in parallel should give the same results.

Let's first consider the two-arg version of reduction:

T reduce(I, (T, T) -> T)

The sequential implementation is straightforward. The identity value I is "accumulated" with the zeroth stream element to give a result. This result is accumulated with the first stream element to give another result, which in turn is accumulated with the second stream element, and so forth. After the last element is accumulated, the final result is returned.

The parallel implementation starts off by splitting the stream into segments. Each segment is processed by its own thread in the sequential fashion I described above. Now, if we have N threads, we have N intermediate results. These need to be reduced down to one result. Since each intermediate result is of type T, and we have several, we can use the same accumulator function to reduce those N intermediate results down to a single result.

Now let's consider a hypothetical two-arg reduction operation that reduces Stream<T> to U. In other languages, this is called a "fold" or "fold-left" operation so that's what I'll call it here. Note this doesn't exist in Java.

U foldLeft(I, (U, T) -> U)

(Note that the identity value I is of type U.)

The sequential version of foldLeft is just like the sequential version of reduce except that the intermediate values are of type U instead of type T. But it's otherwise the same. (A hypothetical foldRight operation would be similar except that the operations would be performed right-to-left instead of left-to-right.)

Now consider the parallel version of foldLeft. Let's start off by splitting the stream into segments. We can then have each of the N threads reduce the T values in its segment into N intermediate values of type U. Now what? How do we get from N values of type U down to a single result of type U?

What's missing is another function that combines the multiple intermediate results of type U into a single result of type U. If we have a function that combines two U values into one, that's sufficient to reduce any number of values down to one -- just like the original reduction above. Thus, the reduction operation that gives a result of a different type needs two functions:

U reduce(I, (U, T) -> U, (U, U) -> U)

Or, using Java syntax:

<U> U reduce(U identity, BiFunction<U,? super T,U> accumulator, BinaryOperator<U> combiner)

In summary, to do parallel reduction to a different result type, we need two functions: one that accumulates T elements to intermediate U values, and a second that combines the intermediate U values into a single U result. If we aren't switching types, it turns out that the accumulator function is the same as the combiner function. That's why reduction to the same type has only the accumulator function and reduction to a different type requires separate accumulator and combiner functions.

Finally, Java doesn't provide foldLeft and foldRight operations because they imply a particular ordering of operations that is inherently sequential. This clashes with the design principle stated above of providing APIs that support sequential and parallel operation equally.


Since I like doodles and arrows to clarify concepts... let's start!

From String to String (sequential stream)

Suppose having 4 strings: your goal is to concatenate such strings into one. You basically start with a type and finish with the same type.

You can achieve this with

String res = Arrays.asList("one", "two","three","four")
        .stream()
        .reduce("",
                (accumulatedStr, str) -> accumulatedStr + str);  //accumulator

and this helps you to visualize what's happening:

enter image description here

The accumulator function converts, step by step, the elements in your (red) stream to the final reduced (green) value. The accumulator function simply transforms a String object into another String.

From String to int (parallel stream)

Suppose having the same 4 strings: your new goal is to sum their lengths, and you want to parallelize your stream.

What you need is something like this:

int length = Arrays.asList("one", "two","three","four")
        .parallelStream()
        .reduce(0,
                (accumulatedInt, str) -> accumulatedInt + str.length(),                 //accumulator
                (accumulatedInt, accumulatedInt2) -> accumulatedInt + accumulatedInt2); //combiner

and this is a scheme of what's happening

enter image description here

Here the accumulator function (a BiFunction) allows you to transform your String data to an int data. Being the stream parallel, it's splitted in two (red) parts, each of which is elaborated independently from eachother and produces just as many partial (orange) results. Defining a combiner is needed to provide a rule for merging partial int results into the final (green) int one.

From String to int (sequential stream)

What if you don't want to parallelize your stream? Well, a combiner needs to be provided anyway, but it will never be invoked, given that no partial results will be produced.


The two and three argument versions of reduce which you tried to use don't accept the same type for the accumulator.

The two argument reduce is defined as :

T reduce(T identity,
         BinaryOperator<T> accumulator)

In your case, T is String, so BinaryOperator<T> should accept two String arguments and return a String. But you pass to it an int and a String, which results in the compilation error you got - argument mismatch; int cannot be converted to java.lang.String. Actually, I think passing 0 as the identity value is also wrong here, since a String is expected (T).

Also note that this version of reduce processes a stream of Ts and returns a T, so you can't use it to reduce a stream of String to an int.

The three argument reduce is defined as :

<U> U reduce(U identity,
             BiFunction<U,? super T,U> accumulator,
             BinaryOperator<U> combiner)

In your case U is Integer and T is String, so this method will reduce a stream of String to an Integer.

For the BiFunction<U,? super T,U> accumulator you can pass parameters of two different types (U and ? super T), which in your case are Integer and String. In addition, the identity value U accepts an Integer in your case, so passing it 0 is fine.

Another way to achieve what you want :

int length = asList("str1", "str2").stream().mapToInt (s -> s.length())
            .reduce(0, (accumulatedInt, len) -> accumulatedInt + len);

Here the type of the stream matches the return type of reduce, so you can use the two parameter version of reduce.

Of course you don't have to use reduce at all :

int length = asList("str1", "str2").stream().mapToInt (s -> s.length())
            .sum();