Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the danger of side effects in Java 8 Streams?

I'm trying to understand warnings I found in the Documentation on Streams. I've gotten in the habit of using forEach() as a general purpose iterator. And that's lead me to writing this type of code:

public class FooCache {
    private static Map<Integer, Integer> sortOrderCache = new ConcurrentHashMap<>();
    private static Map<Integer, String> codeNameCache = new ConcurrentHashMap<>();

    public static void populateCache() {
        List<Foo> myThings = getThings();

        myThings.forEach(thing -> {
            sortOrderCache.put(thing.getId(), thing.getSortOrder());
            codeNameCache.put(thing.getId(), thing.getCodeName())
        });
    }
}

This is a trivialized example. I understand that this code violates Oracle's warning against stateful lamdas and side-effects. But I don't understand why this warning exists.

When running this code it appears to behave as expected. So how do I break this to demonstrate why it's a bad idea?

In sort, I read this:

If executed in parallel, the non-thread-safety of ArrayList would cause incorrect results, and adding needed synchronization would cause contention, undermining the benefit of parallelism.

But can anyone add clarity to help me understand the warning?

like image 664
GridDragon Avatar asked Oct 31 '17 17:10

GridDragon


People also ask

What is the disadvantage of parallel stream in Java 8?

1. Parallel Streams can actually slow you down. Java 8 brings the promise of parallelism as one of the most anticipated new features.

Is Java stream thread safe?

In general no. If the Spliterator used has the CONCURRENT characteristic, then the stream is thread-safe.


2 Answers

From the Javadoc:

Note also that attempting to access mutable state from behavioral parameters presents you with a bad choice with respect to safety and performance; if you do not synchronize access to that state, you have a data race and therefore your code is broken, but if you do synchronize access to that state, you risk having contention undermine the parallelism you are seeking to benefit from. The best approach is to avoid stateful behavioral parameters to stream operations entirely; there is usually a way to restructure the stream pipeline to avoid statefulness.

The problem here is that if you access a mutable state, you loose on two side:

  • Safety, because you need synchronization which the Stream tries to minimize
  • Performance, because the required synchronization cost you (in your example, if you use a ConcurrentHashMap, this has a cost).

Now, in your example, there are several points here:

  • If you want to use Stream and multi threading stream, you need to use parralelStream() as in myThings.parralelStream(); as it stands, the forEach method provided by java.util.Collection is simple for each.
  • You use HashMap as a static member and you mutate it. HashMap is not threadsafe; you need to use a ConcurrentHashMap.

In the lambda, and in the case of a Stream, you must not mutate the source of your stream:

myThings.stream().forEach(thing -> myThings.remove(thing));

This may work (but I suspect it will throw a ConcurrentModificationException) but this will likely not work:

myThings.parallelStream().forEach(thing -> myThings.remove(thing));

That's because the ArrayList is not thread safe.

If you use a synchronized view (Collections.synchronizedList), then you would have a performance it because you synchronize on each access.

In your example, you would rather use:

sortOrderCache = myThings.stream()
                         .collect(Collectors.groupingBy(
                           Thing::getId, Thing::getSortOrder);
codeNameCache= myThings.stream()
                       .collect(Collectors.groupingBy(
                         Thing::getId, Thing::getCodeName);

The finisher (here the groupingBy) does the work you were doing and might be called sequentially (I mean, the Stream may be split across several thread, the the finisher may be invoked several times (in different thread) and then it might need to merge.

By the way, you might eventually drop the codeNameCache/sortOrderCache and simply store the id->Thing mapping.

like image 160
NoDataFound Avatar answered Sep 29 '22 04:09

NoDataFound


I believe the documentation is mentioning about the side effects demonstrated by the below code:

List<Integer> matched = new ArrayList<>();
List<Integer> elements = new ArrayList<>();

for(int i=0 ; i< 10000 ; i++) {
    elements.add(i);
}

elements.parallelStream()
    .forEach(e -> {
        if(e >= 100) {
            matched.add(e);
        }
    });
System.out.println(matched.size());

This code streams through the list in parallel, and tries to add elements into other list if they match the certain criteria. As the resultant list is not synchronised, you will get java.lang.ArrayIndexOutOfBoundsException while executing the above code.

The fix would be to create a new list and return, e.g.:

List<Integer> elements = new ArrayList<>();
for(int i=0 ; i< 10000 ; i++) {
    elements.add(i);
}   
List<Integer> matched = elements.parallelStream()
    .filter(e -> e >= 100)
    .collect(Collectors.toList());
System.out.println(matched.size());
like image 26
Darshan Mehta Avatar answered Sep 29 '22 05:09

Darshan Mehta