Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Doesn't Stream.parallel() update the characteristics of spliterator?

This question is based on the answers to this question What is the difference between Stream.of and IntStream.range?

Since the IntStream.range produces an already sorted stream, the output to the below code would only generate the output as 0:

IntStream.range(0, 4)
         .peek(System.out::println)
         .sorted()
         .findFirst();

Also the spliterator would have SORTED characteristics. Below code returns true:

System.out.println(
    IntStream.range(0, 4)
             .spliterator()
             .hasCharacteristics(Spliterator.SORTED)
);

Now, If I introduce a parallel() in the first code, then as expected, the output would contain all 4 numbers from 0 to 3 but in a random order, because the stream is not sorted anymore due to parallel().

IntStream.range(0, 4)
         .parallel()
         .peek(System.out::println)
         .sorted()
         .findFirst();

This would produce something like below: (in any random order)

2
0
1
3

So, I expect that the SORTED property has been removed due to parallel(). But, the below code returns true as well.

System.out.println(
    IntStream.range(0, 4)
             .parallel()
             .spliterator()
             .hasCharacteristics(Spliterator.SORTED)
);

Why doesn't the parallel() change SORTED property? And since all four numbers are printed, How does Java realize that the stream is not sorted even though the SORTED property still exists?

like image 627
Gautham M Avatar asked May 25 '21 03:05

Gautham M


Video Answer


2 Answers

How exactly this is done is very much an implementation detail. You will have to dig deep inside the source code to really see why. Basically, parallel and sequential pipelines are just handled differently. Look at the AbstractPipeline.evaluate, which checks isParallel(), then does different things depending whether the pipeline is parallel.

    return isParallel()
           ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
           : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));

If you then look at SortedOps.OfInt, you'll see that it overrides two methods:

@Override
public Sink<Integer> opWrapSink(int flags, Sink sink) {
    Objects.requireNonNull(sink);

    if (StreamOpFlag.SORTED.isKnown(flags))
        return sink;
    else if (StreamOpFlag.SIZED.isKnown(flags))
        return new SizedIntSortingSink(sink);
    else
        return new IntSortingSink(sink);
}

@Override
public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
                                               Spliterator<P_IN> spliterator,
                                               IntFunction<Integer[]> generator) {
    if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
        return helper.evaluate(spliterator, false, generator);
    }
    else {
        Node.OfInt n = (Node.OfInt) helper.evaluate(spliterator, true, generator);

        int[] content = n.asPrimitiveArray();
        Arrays.parallelSort(content);

        return Nodes.node(content);
    }
}

opWrapSink will be eventually called if it's a sequential pipeline, and opEvaluateParallel (as its name suggests) will be called when it's a parallel stream. Notice how opWrapSink doesn't do anything to the given sink if the pipeline is already sorted (just returns it unchanged), but opEvaluateParallel always evaluates the spliterator.

Also note that parallel-ness and sorted-ness are not mutually exclusive. You can have a stream with any combination of those characteristics.

"Sorted" is a characteristic of a Spliterator. It's not technically a characteristic of a Stream (like "parallel" is). Sure, parallel could create a stream with an entirely new spliterator (that gets elements from the original spliterator) with entirely new characteristics, but why do that, when you can just reuse the same spliterator? Id imagine you'll have to handle parallel and sequential streams differently in any case.

like image 127
Sweeper Avatar answered Oct 16 '22 22:10

Sweeper


You need to take a step back and think of how you would solve such a problem in general, considering that ForkJoinPool is used for parallel streams and it works based on work stealing. It would be very helpful if you knew how a Spliterator works, too. Some details here.

You have a certain Stream, you "split it" (very simplified) into smaller pieces and give all those pieces to a ForkJoinPool for execution. All of those pieces are worked on independently, by individual threads. Since we are talking about threads here, there is obviously no sequence of events, things happen randomly (that is why you see a random order output).

If your stream preserves the order, terminal operation is suppose to preserve it too. So while intermediate operations are executed in any order, your terminal operation (if the stream up to that point is ordered), will handle elements in an ordered fashion. To put it slightly simplified:

System.out.println(
    IntStream.of(1,2,3)
             .parallel()
             .map(x -> {System.out.println(x * 2); return x * 2;})
             .boxed()
             .collect(Collectors.toList()));

map will process elements in an unknown order (ForkJoinPool and threads, remember that), but collect will receive elements in order, "left to right".


Now, if we extrapolate that to your example: when you invoke parallel, the stream is split in small pieces and worked on. For example look how this is split (a single time).

Spliterator<Integer> spliterator =
IntStream.of(5, 4, 3, 2, 1, 5, 6, 7, 8)
         .parallel()
         .boxed()
         .sorted()
         .spliterator()
         .trySplit(); // trySplit is invoked internally on parallel

spliterator.forEachRemaining(System.out::println);

On my machine it prints 1,2,3,4. This means that the internal implementation splits the stream in two Spliterators: left and right. left has [1, 2, 3, 4] and right has [5, 6, 7, 8]. But that is not it, these Spliterators can be split further. For example:

Spliterator<Integer> spliterator =
IntStream.of(5, 4, 3, 2, 1, 5, 6, 7, 8)
         .parallel()
         .boxed()
         .sorted()
         .spliterator()
         .trySplit()
         .trySplit()
         .trySplit();

spliterator.forEachRemaining(System.out::println);

if you try to invoke trySplit again, you will get a null - meaning, that's it, I can't split anymore.

So, your Stream : IntStream.range(0, 4) is going to be split in 4 spliterators. All worked on individually, by a thread. If your first thread knows that this Spliterator it currently works on, is the "left-most one", that's it! The rest of the threads do not even need to start their work - the result is known.

On the other hand, it could be that this Spliterator that has the "left-most" element is only started last. So the first three ones, might already be done with their work (thus peek is invoked in your example), but they do not "produce" the needed result.

As a matter fact, this is how it is done internally. You do not need to understand the code - but the flow and the method names should be obvious.

like image 4
Eugene Avatar answered Oct 17 '22 00:10

Eugene