Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

does stateful map operation of ordered stream process elements in deterministic way?

I'm reading about java streams API and I encountered the following here:

The operation forEachOrdered processes elements in the order specified by the stream, regardless of whether the stream is executed in serial or parallel. However, when a stream is executed in parallel, the map operation processes elements of the stream specified by the Java runtime and compiler. Consequently, the order in which the lambda expression e -> { parallelStorage.add(e); return e; } adds elements to the List parallelStorage can vary every time the code is run. For deterministic and predictable results, ensure that lambda expression parameters in stream operations are not stateful.

I tested the following code and in fact, it works as mentioned:

public class MapOrdering {

  public static void main(String[] args) {
    // TODO Auto-generated method stub
    List < String > serialStorage = new ArrayList < > ();

    System.out.println("Serial stream:");
    int j = 0;
    List < String > listOfIntegers = new ArrayList();
    for (int i = 0; i < 10; i++) listOfIntegers.add(String.valueOf(i));

    listOfIntegers.stream().parallel().map(e - > {
      serialStorage.add(e.concat(String.valueOf(j)));
      return e;
    }).forEachOrdered(k - > System.out.println(k));;
    /* 
    // Don't do this! It uses a stateful lambda expression.
    .map(e -> { serialStorage.add(e); return e; })*/

    for (String s: serialStorage) System.out.println(s);
  }
}

output

Serial stream: 0 1 2 3 4 5 6 7 8 9 null null 80 90 50 40 30 00

questions:

  1. The output changes every time I run this. How do I make sure that the stateful map operation is executed in order.
  2. map is an intermediate operation and it only starts processing elements until terminal operation commences. Since a terminal operation is ordered, why is a map operation unordered, and tends to change results every time when working with stateful operation?
like image 845
amarnath harish Avatar asked Aug 23 '18 13:08

amarnath harish


2 Answers

You got lucky to see that serialStorage has all the elements that you think it will, after all you are adding from multiple threads multiple elements to a non-thread-safe collection ArrayList. You could have easily seen nulls or a List that does not have all the elements. But even when you add a List that is thread-safe - there is absolutely no order that you can rely on in that List.

This is explicitly mentioned in the documentation under side-effects, and intermediate operations should be side effect-free.

Basically there are two orderings: processing order (intermediate operations) and encounter order. The last one is preserved (if it is has one to begin with and stream intermediate operations don't break it - for example unordered, sorted).

Processing order is not specified, meaning all intermediate operations will process elements in whatever order they feel like. Encounter order (the one you see from a terminal operation) will preserver the initial order.

But even terminal operations don't have to preserve the initial order, for example forEach vs forEachOrdered or when you collect to a Set; of course read the documentation, it usually states clearly this aspect.

like image 141
Eugene Avatar answered Sep 30 '22 04:09

Eugene


I would like to answer your 2 questions, while adding to this other answer...

  1. output changes everytime i run this. how to write code to process statefull map operation in an ordered way?

Stateful map operations are discouraged and you shouldn't use them, even for sequential streams. If you want that behaviour, you'd better use an imperative approach.

  1. map is intermediate operation and it only starts processing elements until terminal operation commences.since terminal operation is ordered ,why map operation is unordered and tend to change results every time when working with statefull operation?

Only forEachOrdered respects encounter order of elements; intermediate operations (such as map) are not compelled to do so. For a parallel stream, this means that intermediate operations are allowed to be executed in any order by the pipeline, thus taking advantage of parallelism.

However, bear in mind that providing a stateful argument to an intermediate operation, (i.e. a stateful mapper function to the map operation) when the stream is parallel, would require you to manually synchronize the state kept by the stateful argument (i.e. you would need to use a synchronized view of the list, or implement some locking mechanism, etc), but this would in turn affect performance negatively, since (as stated in the docs) you'd risk having contention undermine the parallelism you are seeking to benefit from.

Edit: for a terminal operation like forEachOrdered, parallelism would usually bring little benefit, since many times it needs to do some internal processing to comply with the requirement of respecting encounter order, i.e. buffer the elements.

like image 28
fps Avatar answered Sep 30 '22 03:09

fps