Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can a Collector's combiner function ever be used on sequential streams?

Sample program:

public final class CollectorTest {     private CollectorTest()     {     }      private static <T> BinaryOperator<T> nope()     {         return (t, u) -> { throw new UnsupportedOperationException("nope"); };     }      public static void main(final String... args)     {         final Collector<Integer, ?, List<Integer>> c             = Collector.of(ArrayList::new, List::add, nope());          IntStream.range(0, 10_000_000).boxed().collect(c);     } } 

So, to simplify matters here, there is no final transformation, so the resulting code is quite simple.

Now, IntStream.range() produces a sequential stream. I simply box the results into Integers and then my contrived Collector collects them into a List<Integer>. Pretty simple.

And no matter how many times I run this sample program, the UnsupportedOperationException never hits, which means my dummy combiner is never called.

I kind of expected this, but then I have already misunderstood streams enough that I have to ask the question...

Can a Collector's combiner ever be called when the stream is guaranteed to be sequential?

like image 306
fge Avatar asked Mar 23 '15 12:03

fge


People also ask

What is the difference between parallel and sequential stream?

Sequential stream is based on a single core and works similar as for loop. Parallel stream makes use of the computer's numerous cores and divides the job into multiple subtasks, each of which is executed in its own thread. By default, any stream action in Java is performed sequentially.

What is a Collector function?

Collectors is a final class that extends Object class. It provides reduction operations, such as accumulating elements into collections, summarizing elements according to various criteria, etc.

What is Collector interface in Java?

public interface Collector<T,A,R> A mutable reduction operation that accumulates input elements into a mutable result container, optionally transforming the accumulated result into a final representation after all input elements have been processed.

How to use collect in stream?

We can use Stream collect() function to perform a mutable reduction operation and concatenate the list elements. The supplier function is returning a new StringBuilder object in every call. The accumulator function is appending the list string element to the StringBuilder instance.


2 Answers

A careful reading of the streams implementation code in ReduceOps.java reveals that the combine function is called only when a ReduceTask completes, and ReduceTask instances are used only when evaluating a pipeline in parallel. Thus, in the current implementation, the combiner is never called when evaluating a sequential pipeline.

There is nothing in the specification that guarantees this, however. A Collector is an interface that makes requirements on its implementations, and there are no exemptions granted for sequential streams. Personally, I find it difficult to imagine why sequential pipeline evaluation might need to call the combiner, but someone with more imagination than me might find a clever use for it, and implement it. The specification allows for it, and even though today's implementation doesn't do it, you still have to think about it.

This should not surprising. The design center of the streams API is to support parallel execution on an equal footing with sequential execution. Of course, it is possible for a program to observe whether it is being executed sequentially or in parallel. But the design of the API is to support a style of programming that allows either.

If you're writing a collector and you find that it's impossible (or inconvenient, or difficult) to write an associative combiner function, leading you to want to restrict your stream to sequential execution, maybe this means you're heading in the wrong direction. It's time to step back a bit and think about approaching the problem a different way.

A common reduction-style operation that doesn't require an associative combiner function is called fold-left. The main characteristic is that the fold function is applied strictly left-to-right, proceeding one at a time. I'm not aware of a way to parallelize fold-left.

When people try to contort collectors the way we've been talking about, they're usually looking for something like fold-left. The Streams API doesn't have direct API support for this operation, but it's pretty easy to write. For example, suppose you want to reduce a list of strings using this operation: repeat the first string and then append the second. It's pretty easy to demonstrate that this operation isn't associative:

List<String> list = Arrays.asList("a", "b", "c", "d", "e");  System.out.println(list.stream()     .collect(StringBuilder::new,              (a, b) -> a.append(a.toString()).append(b),              (a, b) -> a.append(a.toString()).append(b))); // BROKEN -- NOT ASSOCIATIVE 

Run sequentially, this produces the desired output:

aabaabcaabaabcdaabaabcaabaabcde 

But when run in parallel, it might produce something like this:

aabaabccdde 

Since it "works" sequentially, we could enforce this by calling sequential() and back this up by having the combiner throw an exception. In addition, the supplier must be called exactly once. There's no way to combine the intermediate results, so if the supplier is called twice, we're already in trouble. But since we "know" the supplier is called only once in sequential mode, most people don't worry about this. In fact, I've seen people write "suppliers" that return some existing object instead of creating a new one, in violation of the supplier contract.

In this use of the 3-arg form of collect(), we have two out of the three functions breaking their contracts. Shouldn't this be telling us to do things a different way?

The main work here is being done by the accumulator function. To accomplish a fold-style reduction, we can apply this function in a strict left-to-right order using forEachOrdered(). We have to do a bit of setup and finishing code before and after, but that's no problem:

StringBuilder a = new StringBuilder(); list.parallelStream()     .forEachOrdered(b -> a.append(a.toString()).append(b)); System.out.println(a.toString()); 

Naturally, this works fine in parallel, though the performance benefits of running in parallel may be somewhat negated by the ordering requirements of forEachOrdered().

In summary, if you find yourself wanting to do a mutable reduction but you're lacking an associative combiner function, leading you to restrict your stream to sequential execution, recast the problem as a fold-left operation and use forEachRemaining() on your accumulator function.

like image 151
Stuart Marks Avatar answered Oct 11 '22 10:10

Stuart Marks


As observed in previous comments from @MarkoTopolnik and @Duncan there is no guarantee that Collector.combiner() on sequential mode is called to produce a reduced result. In fact, the Java doc is a little bit subjective in this point, which can to lead an not appropriate interpretation.

(...) A parallel implementation would partition the input, create a result container for each partition, accumulate the contents of each partition into a subresult for that partition, and then use the combiner function to merge the subresults into a combined result.

According to NoBlogDefFound combinator is used only in parallel mode. See the partial quotation below:

combiner() is used to join two accumulators together into one. It is used when collector is executed in parallel, splitting input Stream and collecting parts independently first.

To show more clear this issue I re-write the first code and I put two approaches (serial and parallel).


public final class CollectorTest {     private CollectorTest()     {     }      private static <T> BinaryOperator<T> nope()     {         return (t, u) -> { throw new UnsupportedOperationException("nope"); };     }      public static void main(final String... args)     {          final Collector<Integer, ?, List<Integer>> c =                 Collector                     .of(ArrayList::new, List::add, nope());          // approach sequential         Stream<Integer> sequential = IntStream                 .range(0, 10_000_000)                 .boxed();          System.out.println("isParallel:" + sequential.isParallel());         sequential                 .collect(c);          // approach parallel         Stream<Integer> parallel = IntStream                 .range(0, 10_000_000)                 .parallel()                 .boxed();          System.out.println("isParallel:" + parallel.isParallel());         parallel                 .collect(c);     } } 

After running this code we can get the output:

isParallel:false isParallel:true Exception in thread "main" java.lang.UnsupportedOperationException: nope     at com.stackoverflow.lambda.CollectorTest.lambda$nope$0(CollectorTest.java:18)     at com.stackoverflow.lambda.CollectorTest$$Lambda$3/2001049719.apply(Unknown Source)     at java.util.stream.ReduceOps$3ReducingSink.combine(ReduceOps.java:174)     at java.util.stream.ReduceOps$3ReducingSink.combine(ReduceOps.java:160) 

So, according this result we can infer that Collector's combiner can be called only by the parallel execution.

like image 38
e2a Avatar answered Oct 11 '22 12:10

e2a