Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How collectors are used when turning the stream in parallel

I actually tried to answer this question How to skip even lines of a Stream<String> obtained from the Files.lines. So I though this collector wouldn't work well in parallel:

private static Collector<String, ?, List<String>> oddLines() {
    int[] counter = {1};
    return Collector.of(ArrayList::new,
            (l, line) -> {
                if (counter[0] % 2 == 1) l.add(line);
                counter[0]++;
            },
            (l1, l2) -> {
                l1.addAll(l2);
                return l1;
            });
}

but it works.

EDIT: It didn't actually work; I got fooled by the fact that my input set was too small to trigger any parallelism; see discussion in comments.

I thought it wouldn't work because of the two following plans of executions comes to my mind.


1. The counter array is shared among all threads.

Thread t1 read the first element of the Stream, so the if condition is satisfied. It adds the first element to its list. Then the execution stops before he has the time to update the array value.

Thread t2, which says started at the 4th element of the stream add it to its list. So we end up with a non-wanted element.

Of course since this collector seems to works, I guess it doesn't work like that. And the updates are not atomic anyway.


2. Each Thread has its own copy of the array

In this case there is no more problems for the update, but nothing prevents me that the thread t2 will not start at the 4th element of the stream. So he doesn't work like that either.


So it seems that it doesn't work like that at all, which brings me to the question... how the collector is used in parallel?

Can someone explain me basically how it works and why my collector works when ran in parallel?

Thank you very much!

like image 962
user2336315 Avatar asked May 11 '15 15:05

user2336315


People also ask

Which method defined by collection is used to obtain a parallel stream?

To create a parallel stream from another stream, use the parallel() method. To create a parallel stream from a Collection use the parallelStream() method.

What is the use of collect method in stream?

We can use Stream collect() function to perform a mutable reduction operation and concatenate the list elements. The supplier function is returning a new StringBuilder object in every call. The accumulator function is appending the list string element to the StringBuilder instance.

When using parallel streams how many threads are used to process a streamed collection?

In case of Parallel stream,4 threads are spawned simultaneously and it internally using Fork and Join pool to create and manage threads.

How does a parallel stream work?

Normally any java code has one stream of processing, where it is executed sequentially. Whereas by using parallel streams, we can divide the code into multiple streams that are executed in parallel on separate cores and the final result is the combination of the individual outcomes.


1 Answers

Passing a parallel() source stream into your collector is enough to break the logic because your shared state (counter) may be incremented from different tasks. You can verify that, because it is never returning the correct result for any finite stream input:

    Stream<String> lines = IntStream.range(1, 20000).mapToObj(i -> i + "");
    System.out.println(lines.isParallel());
    lines = lines.parallel();
    System.out.println(lines.isParallel());

    List<String> collected = lines.collect(oddLines());

    System.out.println(collected.size());

Note that for infinite streams (e.g. when reading from Files.lines()) you need to generate some significant amount of data in the stream, so it actually forks a task to run some chunks concurrently.

Output for me is:

false
true
12386

Which is clearly wrong.


As @Holger in the comments correctly pointed out, there is a different race that can happen when your collector is specifying CONCURRENT and UNORDERED, in which case they operate on a single shared collection across tasks (ArrayList::new called once per stream), where-as with only parallel() it will run the accumulator on a collection per task and then later combine the result using your defined combiner.

If you'd add the characteristics to the collector, you might run into the following result due to the shared state in a single collection:

false
true
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 73
    at java.util.ArrayList.add(ArrayList.java:459)
    at de.jungblut.stuff.StreamPallel.lambda$0(StreamPallel.java:18)
    at de.jungblut.stuff.StreamPallel$$Lambda$3/1044036744.accept(Unknown Source)
    at java.util.stream.ReferencePipeline.lambda$collect$207(ReferencePipeline.java:496)
    at java.util.stream.ReferencePipeline$$Lambda$6/2003749087.accept(Unknown Source)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
    at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)
    at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
    at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:512)
    at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401)
    at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
    at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:496)
    at de.jungblut.stuff.StreamPallel.main(StreamPallel.java:32)12386
like image 58
Thomas Jungblut Avatar answered Sep 28 '22 09:09

Thomas Jungblut