Can anyone please point me in the right direction as I cannot understand the issue.
I am executing following method.
private static void reduce_parallelStream() {
List<String> vals = Arrays.asList("a", "b");
List<String> join = vals.parallelStream().reduce(new ArrayList<String>(),
(List<String> l, String v) -> {
l.add(v);
return l;
}, (a, b) -> {
a.addAll(b);
return a;
}
);
System.out.println(join);
}
It prints
[null, a, null, a]
I cannot understand why does it put two null in the resultant list. I expected the answer to be
[a, b]
as it is a parallel stream so the first parameter to reduce
new ArrayList()
would probably be called twice for each input value a and b.
Then the accumulator function would probably be called twice as it is a parallelStream and pass each input "a and b" in each call along with the lists provided by seeded value. So a is added to list 1 and b is added to list 2 (or vice versa). Afterwards the combinator will combine both lists but it doesn't happen.
Interestingly, if I put a print statement inside my accumulator to print the value of input, the output changes. So following
private static void reduce_parallelStream() {
List<String> vals = Arrays.asList("a", "b");
List<String> join = vals.parallelStream().reduce(new ArrayList<String>(),
(List<String> l, String v) -> {
System.out.printf("l is %s", l);
l.add(v);
System.out.printf("l is %s", l);
return l;
}, (a, b) -> {
a.addAll(b);
return a;
}
);
System.out.println(join);
}
results in this output
l is []l is [b]l is [b, a]l is [b, a][b, a, b, a]
Can anyone please explain.
You should be using Collections.synchronizedList()
when working with parallelStream()
. Because ArrayList
is not threadsafe and you get unexpected behavior when accessing it concurrently, like you're doing it with parallelStream()
.
I have modified your code and now it's working correctly:
private static void reduce_parallelStream() {
List<String> vals = Arrays.asList("a", "b");
// Use Synchronized List when with parallelStream()
List<String> join = vals.parallelStream().reduce(Collections.synchronizedList(new ArrayList<>()),
(l, v) -> {
l.add(v);
return l;
}, (a, b) -> a // don't use addAll() here to multiplicate the output like [a, b, a, b]
);
System.out.println(join);
}
Output:
Sometimes you'll get this output:
[a, b]
And sometimes this one:
[b, a]
Reason for this is that it's a parallelStream()
so you can't be sure about the order of execution.
as it is a parallel stream so the first parameter to reduce
new ArrayList()
would probably be called twice for each input value a and b.
That's where you are wrong. The first parameter is a single ArrayList
instance, not a lambda expression can produce multiple ArrayList
instances.
Therefore, the entire reduction operates on a single ArrayList
instance. When multiple threads modify that ArrayList
in parallel, the results may change in each execution.
Your combiner
actually adds all the elements of a List
to the same List
.
You can obtain the expected [a,b]
output if both the accumulator
and combiner
functions will produce a new ArrayList
instead of mutating their input ArrayList
:
List<String> join = vals.parallelStream().reduce(
new ArrayList<String>(),
(List<String> l, String v) -> {
List<String> cl = new ArrayList<>(l);
cl.add(v);
return cl;
}, (a, b) -> {
List<String> ca = new ArrayList<>(a);
ca.addAll(b);
return ca;
}
);
That said, you shouldn't be using reduce
at all. collect
is the correct way to perform a mutable reduction:
List<String> join = vals.parallelStream()
.collect(ArrayList::new,ArrayList::add,ArrayList::addAll);
As you can see, here, unlike in reduce
, the first parameter you pass is a Supplier<ArrayList<String>>
, which can be used to generate as many intermediate ArrayList
instances as necessary.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With