Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallel Stream non-concurrent unordered collector

Suppose I have this custom collector :

  public class CustomToListCollector<T> implements Collector<T, List<T>, List<T>> {

     @Override
     public Supplier<List<T>> supplier() {
         return ArrayList::new;
     }

     @Override
     public BiConsumer<List<T>, T> accumulator() {
         return List::add;
     }

     @Override
     public BinaryOperator<List<T>> combiner() {
         return (l1, l2) -> {
            l1.addAll(l2);
            return l1;
         };
     }

     @Override
     public Function<List<T>, List<T>> finisher() {
         return Function.identity();
     }

     @Override
     public Set<java.util.stream.Collector.Characteristics> characteristics() {
         return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED);
     }
}

This is exactly the Collectors#toList implementation with one minor difference: there's also UNORDERED characteristics added.

I would assume that running this code :

    List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);

    for (int i = 0; i < 100_000; i++) {
        List<Integer> result = list.parallelStream().collect(new CustomToListCollector<>());
        if (!result.equals(list)) {
            System.out.println(result);
            break;
        }
    }

should actually produce some result. But it does not.

I've looked under the hood a bit. ReferencePipeline#collect first checks if the stream is parallel, if the collector is concurrent and if the collector is unordered. Concurrent is missing, so it delegates to a method evaluate by creating a TerminalOp out of this collector. This under the hood is a ReducingSink, that actually cares if the collector is unordered or not:

         return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
        @Override
        public ReducingSink makeSink() {
            return new ReducingSink();
        }

        @Override
        public int getOpFlags() {
            return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
                   ? StreamOpFlag.NOT_ORDERED
                   : 0;
        }
    }; 

I have not debugged further since it gets pretty complicated fast.

Thus may be there is a shortcut here and someone could explain what I am missing. It is a parallel stream that collects elements in a non-concurrent unordered collector. Shouldn't there be no order in how the threads combine the results together? If not, how is the order imposed here (by whom)?

like image 691
Eugene Avatar asked Nov 30 '16 08:11

Eugene


People also ask

What is the difference between stream () and parallelStream ()?

stream() works in sequence on a single thread with the println() operation. list. parallelStream(), on the other hand, is processed in parallel, taking full advantage of the underlying multicore environment. The interesting aspect is in the output of the preceding program.

When would you not use parallel streams?

Similarly, don't use parallel if the stream is ordered and has much more elements than you want to process, e.g. This may run much longer because the parallel threads may work on plenty of number ranges instead of the crucial one 0-100, causing this to take very long time.

What is a parallel stream?

A parallel stream is a stream that splits its elements into multiple chunks, processing each chunk with a different thread. Thus, you can automatically partition the workload of a given operation on all the cores of your multicore processor and keep all of them equally busy.

What are the features of parallel stream?

Parallel stream leverage multi-core processors, which increases its performance. Using parallel streams, our code gets divide into multiple streams which can be executed parallelly on separate cores of the system and the final result is shown as the combination of all the individual core's outcomes.


1 Answers

Note that the result is the same when using list .parallelStream() .unordered() .collect(Collectors.toList()), in either case, the unordered property is not used within the current implementation.

But let’s change the setup a little bit:

List<Integer> list = Collections.nCopies(10, null).stream()
    .flatMap(ig -> IntStream.range(0, 100).boxed())
    .collect(Collectors.toList());
List<Integer> reference = new ArrayList<>(new LinkedHashSet<>(list));

for (int i = 0; i < 100_000; i++) {
    List<Integer> result = list.parallelStream()
      .distinct()
      .collect(characteristics(Collectors.toList(), Collector.Characteristics.UNORDERED));
    if (!result.equals(reference)) {
        System.out.println(result);
        break;
    }
}

using the characteristics collector factory of this answer
The interesting outcome is that in Java 8 versions prior to 1.8.0_60, this has a different outcome. If we use objects with distinct identities instead of the canonical Integer instance, we could detect that in these earlier versions, not only the order of the list differs, but that the objects in the result list are not the first encountered instances.

So the unordered characteristic of a terminal operation was propagated to the stream, affecting the behavior of distinct(), similar to that of skip and limit, as discussed here and here.

As discussed in the second linked thread, the back-propagation has been removed completely, which is reasonable when thinking about it a second time. For distinct, skip and limit, the order of the source is relevant and ignoring it just because the order will be ignored in subsequent stages is not right. So the only remaining stateful intermediate operation that could benefit from back-propagation would be sorted, which would be rendered obsolete when the order is being ignored afterwards. But combining sorted with an unordered sink is more like a programming error anyway…

For stateless intermediate operations the order is irrelevant anyway. The stream processing works by splitting the source into chunks, apply all stateless intermediate operations on their elements independently and collecting into a local container, before merging into the result container. So the merging step is the only place, where respecting or ignoring the order (of the chunks) will have an impact on the result and perhaps on the performance.

But the impact isn’t very big. When you implement such an operation, e.g. via ForkJoinTasks, you simply split a task into two, wait for their completion and merge them. Alternatively, a task may split off a chunk into a sub-task, process its remaining chunk in-place, wait for the sub-task and merge. In either case, merging the results in order comes naturally due to the fact that the initiating task has hands on references to the adjacent tasks. To merge with different chunks instead, the associated sub-tasks first have to be found somehow.

The only benefit from merging with a different task would be that you can merge with the first completed task, if the tasks need different time to complete. But when waiting for a sub-task in the Fork/Join framework, the thread won’t be idle, the framework will use the thread for working on other pending tasks in-between. So as long as the main task has been split into enough sub-tasks, there will be full CPU utilization. Also, the spliterators attempt to split into even chunks to reduce the differences between the computing times. It’s very likely that the benefit of an alternative unordered merging implementation doesn’t justify the code duplication, at least with the current implementation.

Still, reporting an unordered characteristic allows the implementation to utilize it when beneficial and implementations can change.

like image 192
Holger Avatar answered Sep 28 '22 01:09

Holger