Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java 8 stream combiner never called

I'm writing a custom java 8 collector which is supposed to compute the average of a POJO which has a getValue() method. Here's the code:

public static Collector<BoltAggregationData, BigDecimal[], BigDecimal> avgCollector = new Collector<BoltAggregationData, BigDecimal[], BigDecimal>() {

        @Override
        public Supplier<BigDecimal[]> supplier() {
            return () -> {
                BigDecimal[] start = new BigDecimal[2];
                start[0] = BigDecimal.ZERO;
                start[1] = BigDecimal.ZERO;
                return start;
            };
        }

        @Override
        public BiConsumer<BigDecimal[], BoltAggregationData> accumulator() {
            return (a,b) ->  {
                a[0] = a[0].add(b.getValue());
                a[1] = a[1].add(BigDecimal.ONE);
            };
        }

        @Override
        public BinaryOperator<BigDecimal[]> combiner() {
            return (a,b) -> {
                a[0] = a[0].add(b[0]);
                a[1] = a[1].add(b[1]);
                return a;
            };
        }

        @Override
        public Function<BigDecimal[], BigDecimal> finisher() {
            return (a) -> {
                return a[0].divide(a[1], 6 , RoundingMode.HALF_UP);
            };
        }

        private final Set<Characteristics> CHARACTERISTICS = new HashSet<Characteristics>(Arrays.asList(Characteristics.CONCURRENT, Characteristics.UNORDERED));

        @Override
        public Set<Characteristics> characteristics() {
            return CHARACTERISTICS;
        }

    };

It all works well in the non-parallel case. However, when I use a parallelStream(), it sometimes doesn't work. For example, given the values from 1 to 10, it computes( 53/9 instead of 55/10). When debugging the debugger never hits the breakpoint in the combiner() function. Is there some kind of flag that I need to set?

like image 402
Martin Boyanov Avatar asked Jun 24 '16 08:06

Martin Boyanov


2 Answers

It looks like the problem is the CONCURRENT characteristic, which does something else than you would think it might:

Indicates that this collector is concurrent, meaning that the result container can support the accumulator function being called concurrently with the same result container from multiple threads.

Instead of calling the combiner, the accumulator is being called concurrently, using the same BigDecimal[] a for all threads. The access to a is not atomic, so it goes wrong:

Thread1 -> retrieves value of a[0]: 3
Thread2 -> retrieves value of a[0]: 3
Thread1 -> adds own value: 3 + 3 = 6
Thread2 -> adds own value: 3 + 4 = 7
Thread1 -> writes 6 to a[0]
Thread2 -> writes 7 to a[0]

Making the value of a[0] 7 when it should be 10. The same kind of thing can happen with a[1], so results can be inconsistent.


If you remove the CONCURRENT characteristic, the combiner will get used instead.

like image 197
Jorn Vernee Avatar answered Nov 06 '22 15:11

Jorn Vernee


Well, that’s exactly what you request when specifying Characteristics.CONCURRENT:

Indicates that this collector is concurrent, meaning that the result container can support the accumulator function being called concurrently with the same result container from multiple threads.

If that’s not the case, as with your Collector, you shouldn’t specify that flag.


As a side note, new HashSet<Characteristics>(Arrays.asList(Characteristics.CONCURRENT, Characteristics.UNORDERED)); is quite inefficient for specifying characteristics. You can just use EnumSet.of(Characteristics.CONCURRENT, Characteristics.UNORDERED). When you remove the wrong concurrent characteristic, you may use either EnumSet.of(Characteristics.UNORDERED) or Collections.singleton(Characteristics.UNORDERED), but a HashSet definitely is overkill.

like image 19
Holger Avatar answered Nov 06 '22 15:11

Holger