Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java lambdas, stateless lambdas and parallel execution

In trying to learn Java lambdas, I came across an article (listed below), where under a section on the limitations of the stream API, he states that: "Stateful lambdas are usually not a problem when executing sequentially, but when the stream execution is parallelized, it breaks". He then gives this code as an example of problems due to execution order:

List<String> ss = ...;
List<String> result = ...;

Stream<String> stream = ss.stream();

stream.map(s -> {
    synchronized (result) {
      if (result.size() < 10) {
        result.add(s);
      }
    }
})
.forEach(e -> { });

I can see how this would be non-deterministic if it were parallelized, but what I can't see is how you would fix this with stateless lambdas -- isn't there something inherently non-deterministic about adding things to a list in a parallel fashion. An example that a six year old in a hat could understand, perhaps in C#, would be much appreciated.

Link to original article http://blog.hartveld.com/2013/03/jdk-8-33-stream-api.html

like image 825
John Powell Avatar asked May 01 '14 15:05

John Powell


People also ask

Is lambda stateless or stateful?

While AWS Lambda's programming model is stateless, your code can access stateful data by calling other web services, such as Amazon S3 or Amazon DynamoDB.

What is stateful lambda in Java?

A stateful lambda expression is one whose result depends on any state that might change during the execution of a pipeline. The following example adds elements from the List listOfIntegers to a new List instance with the map intermediate operation.

What are the two conditions required for using a lambda function in a stream?

In order to match a lambda to a single method interface, also called a "functional interface", several conditions need to be met: The functional interface has to have exactly one unimplemented method, and that method (naturally) has to be abstract.

Does lambda expression improve performance?

Oracle claims that use of lambda expressions also improve the collection libraries making it easier to iterate through, filter, and extract data from a collection. In addition, new concurrency features improve performance in multicore environments.


1 Answers

I know where you are hinting at with your question, and I will do my best to explain.

Consider an input list consisting of 8 elements:

[1, 2, 3, 4, 5, 6, 7, 8]

And assume streams would parallellize it in the following way, in reality they do not, the exact process of parallellization is quite difficult to understand.
But for now, assume that they would divide the size by two until a two elements remain.

The branching division would look like this:

  1. First division:

    [1, 2, 3, 4]
    [5, 6, 7, 8]

  2. Second division:

    [1, 2]
    [3, 4]
    [5, 6]
    [7, 8]

Now we have four chunks that will (in our theory) be processed by four different threads, which have no knowledge of eachother.
This can indeed be fixed by synchronizing on some external resource, but then you lose the benefits of parallellization, so we need to assume that we do not synchronize, and when we do not synchronize, the other threads will not see what any other threads have done, so our result will be garbage.

Now onto the part of the question where you ask about statelessness, how could it then be processed in parallel correctly? How can you add elements that are processed in parallel in the correct order to a list?

First assume a simple mapping function where you map with the lambda i -> i + 10, and then print it with System.out::println in a foreach.

Now after the second division the following will occur:

[1, 2] -> [11, 12] -> { System.out.println(11); System.println(12); }
[3, 4] -> [13, 14] -> { System.out.println(13); System.println(14); }
[5, 6] -> [15, 16] -> { System.out.println(15); System.println(16); }
[7, 8] -> [17, 18] -> { System.out.println(17); System.println(18); }

There is no guarantee on the order apart from that all elements processed by the same thread (internal state, not to rely upon) get processed in order.

If you want to process them in order, then you need to use forEachOrdered, which will ensure that all threads operate in the correct order, and you do not lose too much of a parallellization benefit because of this as it applies only to the end state.

To see how you can add items parelllized to an list, take a look at this, by using the Collectors.toList(), which provides methods for:

  • Creating a new list.
  • Adding a value to the list.
  • Merging two lists.

Now the following will happen after the second division:

For every four threads it will do the following (only showing one thread here):

  1. We had [1, 2].
  2. We map it to [11, 12].
  3. We create an empty List<Integer>.
  4. We add 11 to the list.
  5. We add 12 to the list.

Now all threads have done this, and we have four lists of two elements.

Now the following merges occur in the specified order:

  1. [11, 12] ++ [13, 14] = [11, 12, 13, 14]
  2. [15, 16] ++ [17, 18] = [15, 16, 17, 18]
  3. Finally [11, 12, 13, 14] ++ [15, 16, 17, 18] = [11, 12, 13, 14, 15, 16, 17, 18]

And thus the resulting list is in order and the mapping has been done in parallel. Now you should also be able to see why parallallization needs a higher minimum as just two items, as else the creation of the new lists and merging get too expensive.

I hope you understand now why stream operations should be stateless to get the full benefits of parallellization.

like image 86
skiwi Avatar answered Oct 13 '22 19:10

skiwi