Can anyone please point me in the right direction as I cannot understand the issue.
I am executing following method.
private static void reduce_parallelStream() {
List<String> vals = Arrays.asList("a", "b");
List<String> join = vals.parallelStream().reduce(new ArrayList<String>(),
(List<String> l, String v) -> {
l.add(v);
return l;
}, (a, b) -> {
a.addAll(b);
return a;
}
);
System.out.println(join);
}
It prints
[null, a, null, a]
I cannot understand why does it put two null in the resultant list. I expected the answer to be
[a, b]
as it is a parallel stream so the first parameter to reduce
new ArrayList()
would probably be called twice for each input value a and b.
Then the accumulator function would probably be called twice as it is a parallelStream and pass each input "a and b" in each call along with the lists provided by seeded value. So a is added to list 1 and b is added to list 2 (or vice versa). Afterwards the combinator will combine both lists but it doesn't happen.
Interestingly, if I put a print statement inside my accumulator to print the value of input, the output changes. So following
private static void reduce_parallelStream() {
List<String> vals = Arrays.asList("a", "b");
List<String> join = vals.parallelStream().reduce(new ArrayList<String>(),
(List<String> l, String v) -> {
System.out.printf("l is %s", l);
l.add(v);
System.out.printf("l is %s", l);
return l;
}, (a, b) -> {
a.addAll(b);
return a;
}
);
System.out.println(join);
}
results in this output
l is []l is [b]l is [b, a]l is [b, a][b, a, b, a]
Can anyone please explain.
You should be using Collections.synchronizedList() when working with parallelStream(). Because ArrayList is not threadsafe and you get unexpected behavior when accessing it concurrently, like you're doing it with parallelStream().
I have modified your code and now it's working correctly:
private static void reduce_parallelStream() {
List<String> vals = Arrays.asList("a", "b");
// Use Synchronized List when with parallelStream()
List<String> join = vals.parallelStream().reduce(Collections.synchronizedList(new ArrayList<>()),
(l, v) -> {
l.add(v);
return l;
}, (a, b) -> a // don't use addAll() here to multiplicate the output like [a, b, a, b]
);
System.out.println(join);
}
Output:
Sometimes you'll get this output:
[a, b]
And sometimes this one:
[b, a]
Reason for this is that it's a parallelStream() so you can't be sure about the order of execution.
as it is a parallel stream so the first parameter to reduce
new ArrayList()would probably be called twice for each input value a and b.
That's where you are wrong. The first parameter is a single ArrayList instance, not a lambda expression can produce multiple ArrayList instances.
Therefore, the entire reduction operates on a single ArrayList instance. When multiple threads modify that ArrayList in parallel, the results may change in each execution.
Your combiner actually adds all the elements of a List to the same List.
You can obtain the expected [a,b] output if both the accumulator and combiner functions will produce a new ArrayList instead of mutating their input ArrayList:
List<String> join = vals.parallelStream().reduce(
new ArrayList<String>(),
(List<String> l, String v) -> {
List<String> cl = new ArrayList<>(l);
cl.add(v);
return cl;
}, (a, b) -> {
List<String> ca = new ArrayList<>(a);
ca.addAll(b);
return ca;
}
);
That said, you shouldn't be using reduce at all. collect is the correct way to perform a mutable reduction:
List<String> join = vals.parallelStream()
.collect(ArrayList::new,ArrayList::add,ArrayList::addAll);
As you can see, here, unlike in reduce, the first parameter you pass is a Supplier<ArrayList<String>>, which can be used to generate as many intermediate ArrayList instances as necessary.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With