Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

is java AtomicReference thread safe when used within parallelStream?

Tags:

java

java-8

I used the parallelStream to get the longest string in an array, the code is following, each time it runs, I get different result. AtomicReference supposes to be thread safe even when used within parallelStream? But why does this happen?

public static void main(String[] args) {
  AtomicReference<String> longest = new AtomicReference<>();
  LongAccumulator accumulator = new LongAccumulator(Math::max, 0);
  List<String> words = Arrays.asList("him", "he", "thanks", "strings", "congratulations", "platform");
  words.parallelStream().forEach(
          next -> longest.updateAndGet(
                     current -> {
                        String result = next.length() > accumulator.intValue() ? next : current;
                        accumulator.accumulate(next.length());
                        return result;
                     }
                  )
  );
  System.out.println(longest.get());
}

for one time, I get “congratulations” printed, and some time I get “platform” printed.

like image 349
David Avatar asked Aug 31 '16 12:08

David


2 Answers

You are invoking LongAccumulator.intValue() which is documented as:

Returns the current value as an int after a narrowing primitive conversion.

and following the linke to the get() method we will learn:

Returns the current value. The returned value is NOT an atomic snapshot; invocation in the absence of concurrent updates returns an accurate result, but concurrent updates that occur while the value is being calculated might not be incorporated.

So while the AtomicReference.updateAndGet operation is thread safe, your concurrent invocation of LongAccumulator.intValue() and LongAccumulator.accumulate is not. A LongAccumulator is intended for performing concurrent accumulate operations, followed by fetching the result after all accumulate operations have been finished. Note that even if get() was returning a correct snapshot, the fact that the invocation of intValue() and the subsequent accumulate() are two distinct, hence non-atomic, operations made the operation still prone to data races.

In most cases, if you find yourself trying to manipulate data structures in a forEach, you are using the wrong tool for the job, making the code unnecessarily complicated and error prone. As Clayn hinted in a comment, a words.parallelStream().max(Comparator.comparingInt(String::l‌​ength)) will do the job concisely and correct.

like image 164
Holger Avatar answered Nov 04 '22 17:11

Holger


Actually i was writing about the issue which @Holger mentioned already, i was a bit late, but i am writing this anyway as a proof of @Holger's answer. You can use AtomicReference's accumulator;

Here is the code :

public static void main(String[] args) {
    AtomicReference < String > longest = new AtomicReference < > ();
    List < String > words = Arrays.asList("him", "he", "thanks", "strings", "congratulations", "platform");

    words.parallelStream().forEach(next - > {
        longest.accumulateAndGet(next, (a, b) - >
            a != null && a.length() > b.length() ? a : b
        );
    });

    System.out.println(longest.get());
}
like image 30
Ömer Erden Avatar answered Nov 04 '22 19:11

Ömer Erden