Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to make stream reduce be thread safe?

Java stream API offers a general .reduce(identity, accumulator) method.

It is pretty clear from javadocs that accumulator should be a stateless function.

However, I have a question about the identity object, namely should it be thread-safe?

Lets say an identity is a java object and an accumulator modifies that object in a way that the modification is not atomic, e.g. accumulator looks into identity's state and then decides how to modify it's internal state exactly. Clearly it can happen that several reduce operations may run at the same time. In this case several questions arise:

  • should this one reduce operation be atomic in the scope of the identity object?
  • is it enough just to make identity object immutable and return a new instance upon each reduce?
like image 486
fyrkov Avatar asked Feb 22 '21 15:02

fyrkov


People also ask

Are streams thread-safe?

All access to the Stream object will be thread safe.

Is parallel stream multithreading?

Overview. Java 8 introduced the concept of Streams as an efficient way of carrying out bulk operations on data. And parallel Streams can be obtained in environments that support concurrency. These streams can come with improved performance – at the cost of multi-threading overhead.

How does reduce work in stream?

Reducing is the repeated process of combining all elements. reduce operation applies a binary operator to each element in the stream where the first argument to the operator is the return value of the previous application and second argument is the current stream element.

What does .reduce do in Java?

In Java 8, the Stream. reduce() combine elements of a stream and produces a single value.

How to achieve thread safety in Java?

Using an atomic variable is another way to achieve thread-safety in java. When variables are shared by multiple threads, the atomic variable ensures that threads don’t crash into each other. Final Variables are also thread-safe in java because once assigned some reference of an object It cannot point to reference of another object.

How thread-safe is a method with a specific input?

Given a specific input, it always produces the same output. The method neither relies on external state nor maintains state at all. Hence, it's considered to be thread-safe and can be safely called by multiple threads at the same time.

How can we achieve thread-safety using stateless methods?

Therefore, the first approach that we'll look at is to achieve thread-safety using stateless implementations. To better understand this approach, let's consider a simple utility class with a static method that calculates the factorial of a number: The factorial () method is a stateless deterministic function.

Is multithreading in Java thread-safe?

As we know Java has a feature, Multithreading, which is a process of running multiple threads simultaneously. When multiple threads are working on the same data, and the value of our data is changing, that scenario is not thread-safe and we will get inconsistent results.


Video Answer


2 Answers

Ordinarily, accumulator is an english word that means: "You are completely hosed if you want parallelism". It's right there in the word: To accumulate - to gather over time. There is no way to do it right except to start from the beginning, and apply accumulation until you are done.

But, java gets around this by adding 2 requirements:

  1. associativity. a X (b X c) must produce the same result as (a X b) X c, where X is the accumulator function.
  2. identity function. ident X a must be equal to a, where ident is the identity you pass to reduce and X is the accumulator function.

Let's use as example the function (a, b) -> a + b and as identity 0, which fulfills both of these requirements if your intent is to sum a list.

Java can parallellize this by just summing arbitrary terms and then summing the results of these. [1, 5, 9, 12] can be summed by first lopping the list into two, then handing these 2 sublists to threads to individually sum, and then summing the answers each thread provides. This implies that java will start accumulation multiple times at arbitrary points in the stream, and will apply identity as part of its accumulation any number of times, at arbitrary points, and that brings swift problems if your identity object is itself mutable.

There's basically no way to combine the notion of a mutable identity object and java's reduce function. It is fundamentally not designed to work that way.

Contrast to the sum example: Instead of modifying a in the (a, b) -> a + b accumulator, neither a nor b are modified; instead, they are combined into a newly created third value, and that's how you should use this method.

Contrast to foldLeft from certain other languages, which do not require either accumulatorFunction(ident, A) to be equal to A, nor associativity, but then cannot by definition parallellize it, at all. That foldLeft can be used with mutable state. For example, here is an impl of summing using a foldLeft, in pseudocode: (note that new int[1] is used here as mutable integer):

int sum = stream.foldLeft(new int[1], (int[] a, int b) -> a[0] += b)[0];

This notion (where the LHS of your accumulator function is always the same thing, namely, your identity object, being modified to integrate each value in the stream as you move along it) is not compatible with java's reduce, and as far as I can recall, java has no (easy) way to do this kind of thing to a stream.

Thus: It's worse! 'thread safe' isn't good enough, it needs to be immutable. Once it is immutable, it is trivially thread safe.

is it enough just to make identity object immutable and return a new instance upon each reduce?

That's not just 'good enough', that's more or less the only sane way to use reduce.

like image 82
rzwitserloot Avatar answered Oct 19 '22 08:10

rzwitserloot


This is covered by the documentation, but not directly, it is implied.

The identity value must be an identity for the accumulator function. This means that for all t, accumulator.apply(identity, t) is equal to t.

As soon as identity is modified, like you say, even if in a thread-safe way, the rule above is violated; thus no guarantees of the expected result.

For the second question the answer is slightly more involved. You do not have to make the identity immutable, as long as no one abuses that (by modifying its internal state). Of course making it immutable helps a lot in that regard.

like image 8
Eugene Avatar answered Oct 19 '22 10:10

Eugene