I'm writing a custom java 8 collector which is supposed to compute the average of a POJO which has a getValue()
method. Here's the code:
public static Collector<BoltAggregationData, BigDecimal[], BigDecimal> avgCollector = new Collector<BoltAggregationData, BigDecimal[], BigDecimal>() {
@Override
public Supplier<BigDecimal[]> supplier() {
return () -> {
BigDecimal[] start = new BigDecimal[2];
start[0] = BigDecimal.ZERO;
start[1] = BigDecimal.ZERO;
return start;
};
}
@Override
public BiConsumer<BigDecimal[], BoltAggregationData> accumulator() {
return (a,b) -> {
a[0] = a[0].add(b.getValue());
a[1] = a[1].add(BigDecimal.ONE);
};
}
@Override
public BinaryOperator<BigDecimal[]> combiner() {
return (a,b) -> {
a[0] = a[0].add(b[0]);
a[1] = a[1].add(b[1]);
return a;
};
}
@Override
public Function<BigDecimal[], BigDecimal> finisher() {
return (a) -> {
return a[0].divide(a[1], 6 , RoundingMode.HALF_UP);
};
}
private final Set<Characteristics> CHARACTERISTICS = new HashSet<Characteristics>(Arrays.asList(Characteristics.CONCURRENT, Characteristics.UNORDERED));
@Override
public Set<Characteristics> characteristics() {
return CHARACTERISTICS;
}
};
It all works well in the non-parallel case. However, when I use a parallelStream()
, it sometimes doesn't work. For example, given the values from 1 to 10, it computes( 53/9 instead of 55/10). When debugging the debugger never hits the breakpoint in the combiner() function. Is there some kind of flag that I need to set?
It looks like the problem is the CONCURRENT
characteristic, which does something else than you would think it might:
Indicates that this collector is concurrent, meaning that the result container can support the accumulator function being called concurrently with the same result container from multiple threads.
Instead of calling the combiner, the accumulator is being called concurrently, using the same BigDecimal[] a
for all threads. The access to a
is not atomic, so it goes wrong:
Thread1 -> retrieves value of a[0]: 3
Thread2 -> retrieves value of a[0]: 3
Thread1 -> adds own value: 3 + 3 = 6
Thread2 -> adds own value: 3 + 4 = 7
Thread1 -> writes 6 to a[0]
Thread2 -> writes 7 to a[0]
Making the value of a[0]
7 when it should be 10. The same kind of thing can happen with a[1]
, so results can be inconsistent.
If you remove the CONCURRENT
characteristic, the combiner will get used instead.
Well, that’s exactly what you request when specifying Characteristics.CONCURRENT
:
Indicates that this collector is concurrent, meaning that the result container can support the accumulator function being called concurrently with the same result container from multiple threads.
If that’s not the case, as with your Collector
, you shouldn’t specify that flag.
As a side note, new HashSet<Characteristics>(Arrays.asList(Characteristics.CONCURRENT, Characteristics.UNORDERED));
is quite inefficient for specifying characteristics. You can just use EnumSet.of(Characteristics.CONCURRENT, Characteristics.UNORDERED)
. When you remove the wrong concurrent characteristic, you may use either EnumSet.of(Characteristics.UNORDERED)
or Collections.singleton(Characteristics.UNORDERED)
, but a HashSet
definitely is overkill.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With