Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java Stream reduce unexplained behaviour

Can anyone please point me in the right direction as I cannot understand the issue.

I am executing following method.

private static void reduce_parallelStream() {
    List<String> vals = Arrays.asList("a", "b");

    List<String> join = vals.parallelStream().reduce(new ArrayList<String>(),
            (List<String> l, String v) -> {

                l.add(v);

                return l;
            }, (a, b) -> {                   
                a.addAll(b);
                return a;
            }

    );

   System.out.println(join);

}

It prints

[null, a, null, a]

I cannot understand why does it put two null in the resultant list. I expected the answer to be

[a, b]

as it is a parallel stream so the first parameter to reduce

new ArrayList()

would probably be called twice for each input value a and b.

Then the accumulator function would probably be called twice as it is a parallelStream and pass each input "a and b" in each call along with the lists provided by seeded value. So a is added to list 1 and b is added to list 2 (or vice versa). Afterwards the combinator will combine both lists but it doesn't happen.

Interestingly, if I put a print statement inside my accumulator to print the value of input, the output changes. So following

private static void reduce_parallelStream() {
    List<String> vals = Arrays.asList("a", "b");

    List<String> join = vals.parallelStream().reduce(new ArrayList<String>(),
            (List<String> l, String v) -> {
                System.out.printf("l is %s", l);
                l.add(v);
                System.out.printf("l is %s", l);
                return l;
            }, (a, b) -> {
                a.addAll(b);
                return a;
            }

    );

   System.out.println(join);

}

results in this output

l is []l is [b]l is [b, a]l is [b, a][b, a, b, a]

Can anyone please explain.

like image 971
tangokhi Avatar asked Jan 25 '23 19:01

tangokhi


2 Answers

You should be using Collections.synchronizedList() when working with parallelStream(). Because ArrayList is not threadsafe and you get unexpected behavior when accessing it concurrently, like you're doing it with parallelStream().

I have modified your code and now it's working correctly:

private static void reduce_parallelStream() {
    List<String> vals = Arrays.asList("a", "b");

    // Use Synchronized List when with parallelStream()
    List<String> join = vals.parallelStream().reduce(Collections.synchronizedList(new ArrayList<>()),
            (l, v) -> {
                l.add(v);
                return l;
            }, (a, b) -> a // don't use addAll() here to multiplicate the output like [a, b, a, b]
    );
    System.out.println(join);
}

Output:

Sometimes you'll get this output:

[a, b]

And sometimes this one:

[b, a]

Reason for this is that it's a parallelStream() so you can't be sure about the order of execution.

like image 102
Mushif Ali Nawaz Avatar answered Jan 30 '23 03:01

Mushif Ali Nawaz


as it is a parallel stream so the first parameter to reduce new ArrayList() would probably be called twice for each input value a and b.

That's where you are wrong. The first parameter is a single ArrayList instance, not a lambda expression can produce multiple ArrayList instances.

Therefore, the entire reduction operates on a single ArrayList instance. When multiple threads modify that ArrayList in parallel, the results may change in each execution.

Your combiner actually adds all the elements of a List to the same List.

You can obtain the expected [a,b] output if both the accumulator and combiner functions will produce a new ArrayList instead of mutating their input ArrayList:

List<String> join = vals.parallelStream().reduce(
     new ArrayList<String>(),
        (List<String> l, String v) -> {
            List<String> cl = new ArrayList<>(l);
            cl.add(v);
            return cl;
        }, (a, b) -> {
            List<String> ca = new ArrayList<>(a);
            ca.addAll(b);
            return ca;
        }
);

That said, you shouldn't be using reduce at all. collect is the correct way to perform a mutable reduction:

List<String> join = vals.parallelStream()
                        .collect(ArrayList::new,ArrayList::add,ArrayList::addAll);

As you can see, here, unlike in reduce, the first parameter you pass is a Supplier<ArrayList<String>>, which can be used to generate as many intermediate ArrayList instances as necessary.

like image 34
Eran Avatar answered Jan 30 '23 04:01

Eran