I'm trying to understand warnings I found in the Documentation on Streams. I've gotten in the habit of using forEach() as a general purpose iterator. And that's lead me to writing this type of code:
public class FooCache {
private static Map<Integer, Integer> sortOrderCache = new ConcurrentHashMap<>();
private static Map<Integer, String> codeNameCache = new ConcurrentHashMap<>();
public static void populateCache() {
List<Foo> myThings = getThings();
myThings.forEach(thing -> {
sortOrderCache.put(thing.getId(), thing.getSortOrder());
codeNameCache.put(thing.getId(), thing.getCodeName())
});
}
}
This is a trivialized example. I understand that this code violates Oracle's warning against stateful lamdas and side-effects. But I don't understand why this warning exists.
When running this code it appears to behave as expected. So how do I break this to demonstrate why it's a bad idea?
In sort, I read this:
If executed in parallel, the non-thread-safety of ArrayList would cause incorrect results, and adding needed synchronization would cause contention, undermining the benefit of parallelism.
But can anyone add clarity to help me understand the warning?
1. Parallel Streams can actually slow you down. Java 8 brings the promise of parallelism as one of the most anticipated new features.
In general no. If the Spliterator used has the CONCURRENT characteristic, then the stream is thread-safe.
From the Javadoc:
Note also that attempting to access mutable state from behavioral parameters presents you with a bad choice with respect to safety and performance; if you do not synchronize access to that state, you have a data race and therefore your code is broken, but if you do synchronize access to that state, you risk having contention undermine the parallelism you are seeking to benefit from. The best approach is to avoid stateful behavioral parameters to stream operations entirely; there is usually a way to restructure the stream pipeline to avoid statefulness.
The problem here is that if you access a mutable state, you loose on two side:
Stream
tries to minimizeConcurrentHashMap
, this has a cost).Now, in your example, there are several points here:
Stream
and multi threading stream, you need to use parralelStream()
as in myThings.parralelStream()
; as it stands, the forEach
method provided by java.util.Collection
is simple for each
.HashMap
as a static
member and you mutate it. HashMap
is not threadsafe; you need to use a ConcurrentHashMap
.In the lambda, and in the case of a Stream
, you must not mutate the source of your stream:
myThings.stream().forEach(thing -> myThings.remove(thing));
This may work (but I suspect it will throw a ConcurrentModificationException
) but this will likely not work:
myThings.parallelStream().forEach(thing -> myThings.remove(thing));
That's because the ArrayList
is not thread safe.
If you use a synchronized view (Collections.synchronizedList
), then you would have a performance it because you synchronize on each access.
In your example, you would rather use:
sortOrderCache = myThings.stream()
.collect(Collectors.groupingBy(
Thing::getId, Thing::getSortOrder);
codeNameCache= myThings.stream()
.collect(Collectors.groupingBy(
Thing::getId, Thing::getCodeName);
The finisher (here the groupingBy
) does the work you were doing and might be called sequentially (I mean, the Stream may be split across several thread, the the finisher may be invoked several times (in different thread) and then it might need to merge.
By the way, you might eventually drop the codeNameCache
/sortOrderCache
and simply store the id->Thing mapping.
I believe the documentation is mentioning about the side effects demonstrated by the below code:
List<Integer> matched = new ArrayList<>();
List<Integer> elements = new ArrayList<>();
for(int i=0 ; i< 10000 ; i++) {
elements.add(i);
}
elements.parallelStream()
.forEach(e -> {
if(e >= 100) {
matched.add(e);
}
});
System.out.println(matched.size());
This code streams through the list in parallel, and tries to add elements into other list if they match the certain criteria. As the resultant list is not synchronised, you will get java.lang.ArrayIndexOutOfBoundsException
while executing the above code.
The fix would be to create a new list and return, e.g.:
List<Integer> elements = new ArrayList<>();
for(int i=0 ; i< 10000 ; i++) {
elements.add(i);
}
List<Integer> matched = elements.parallelStream()
.filter(e -> e >= 100)
.collect(Collectors.toList());
System.out.println(matched.size());
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With