I used the parallelStream to get the longest string in an array, the code is following, each time it runs, I get different result. AtomicReference
supposes to be thread safe even when used within parallelStream? But why does this happen?
public static void main(String[] args) {
AtomicReference<String> longest = new AtomicReference<>();
LongAccumulator accumulator = new LongAccumulator(Math::max, 0);
List<String> words = Arrays.asList("him", "he", "thanks", "strings", "congratulations", "platform");
words.parallelStream().forEach(
next -> longest.updateAndGet(
current -> {
String result = next.length() > accumulator.intValue() ? next : current;
accumulator.accumulate(next.length());
return result;
}
)
);
System.out.println(longest.get());
}
for one time, I get “congratulations” printed, and some time I get “platform” printed.
You are invoking LongAccumulator.intValue()
which is documented as:
Returns the current value as an int after a narrowing primitive conversion.
and following the linke to the get()
method we will learn:
Returns the current value. The returned value is NOT an atomic snapshot; invocation in the absence of concurrent updates returns an accurate result, but concurrent updates that occur while the value is being calculated might not be incorporated.
So while the AtomicReference.updateAndGet
operation is thread safe, your concurrent invocation of LongAccumulator.intValue()
and LongAccumulator.accumulate
is not. A LongAccumulator
is intended for performing concurrent accumulate
operations, followed by fetching the result after all accumulate operations have been finished. Note that even if get()
was returning a correct snapshot, the fact that the invocation of intValue()
and the subsequent accumulate()
are two distinct, hence non-atomic, operations made the operation still prone to data races.
In most cases, if you find yourself trying to manipulate data structures in a forEach
, you are using the wrong tool for the job, making the code unnecessarily complicated and error prone.
As Clayn hinted in a comment, a
words.parallelStream().max(Comparator.comparingInt(String::length))
will do the job concisely and correct.
Actually i was writing about the issue which @Holger mentioned already, i was a bit late, but i am writing this anyway as a proof of @Holger's answer. You can use AtomicReference's accumulator;
Here is the code :
public static void main(String[] args) {
AtomicReference < String > longest = new AtomicReference < > ();
List < String > words = Arrays.asList("him", "he", "thanks", "strings", "congratulations", "platform");
words.parallelStream().forEach(next - > {
longest.accumulateAndGet(next, (a, b) - >
a != null && a.length() > b.length() ? a : b
);
});
System.out.println(longest.get());
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With