I have this code a:
ComparisonResults comparisonResults = requestsList
.stream()
.map(item -> getResponse(item))
.map(item -> compareToBl(item))
.reduce(new ComparisonResults(), (result1, result2) ->
{
result1.addSingleResult(result2);
// return result1;
return new ComparisonResults(result1);
});
and this code b:
ComparisonResults comparisonResults = requestsList
.parallelStream()
.map(item -> getResponse(item))
.map(item -> compareToBl(item))
.reduce(new ComparisonResults(), (result1, result2) ->
{
result1.addSingleResult(result2);
// return result1;
return new ComparisonResults(result1);
});
All I do is to create response objects, then transform them to comaprisonResult objects and to reduce them to one comaprisonResult.
code a shows an int class member comparisonResults.num_of_sub_responses==5 which is correct
code b shows an int class member comparisonResults.num_of_sub_responses==10 which is double the correct result.
java 8 reduce should be thread safe, right?
am i missing anything?
getResponse and compareToBl are thread safe
Your are mutating an incoming object in reduce. This is wrong. It doesn’t help that you are creating a new object after modifying the incoming object.
What you want to do, is
.collect(ComparisonResults::new, ComparisonResults::addSingleResult,
(a,b)->/* code to merge two ComparisonResults instances*/);
If the result of .map(item -> compareToBl(item)) is ComparisonResults or, in other words, addSingleResult merges two ComparisonResults instances, you can use ComparisonResults::addSingleResult as merge function, though it’s name is a bit misleading then.
You should carefully read the “Reduction” chapter of the documentation and its follow-up, “Mutable reduction”.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With