Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Stream stateful computation: cumulative sums

Assuming I have a Java IntStream, is it possible to convert it to an IntStream with cumulative sums? For example, a stream starting with [4, 2, 6, ...] should be converted to [4, 6, 12, ...].

More generally, how should one go about implementing stateful stream operations? It feels like this should be possible:

myIntStream.map(new Function<Integer, Integer> {
    int sum = 0; 
    Integer apply(Integer value){ 
        return sum += value; 
    }
);

With the obvious restriction that this works only on sequential streams. However, Stream.map explicitely requires a stateless map function. Am I right in missing a Stream.statefulMap or Stream.cumulative operation or is that missing the point of Java streams?

Compare for example to Haskell, where the scanl1 function solves exactly this example:

scanl1 (+) [1 2 3 4] = [1 3 6 10]
like image 698
Adrian Leonhard Avatar asked Feb 05 '15 23:02

Adrian Leonhard


2 Answers

You can do this with an atomic number. For example:

import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import java.util.stream.LongStream;

public class Accumulator {
    public static LongStream toCumulativeSumStream(IntStream ints){
        AtomicLong sum = new AtomicLong(0);
        return ints.sequential().mapToLong(sum::addAndGet);
    }

    public static void main(String[] args){
        LongStream sums = Accumulator.toCumulativeSumStream(IntStream.range(1, 5));
        sums.forEachOrdered(System.out::println);
    }
}

This outputs:

1
3
6
10

I've used a Long to store the sums, because it's entirely possible that two ints add up to well over Integer.MAX_VALUE, and a long has less of a chance of overflow.

like image 175
Steve K Avatar answered Nov 15 '22 15:11

Steve K


It's possible to do with a collector that then creates a new stream:

class Accumulator {
    public static void accept(List<Integer> list, Integer value) {
        list.add(value + (list.isEmpty() ? 0 : list.get(list.size() - 1)));
    }

    public static List<Integer> combine(List<Integer> list1, List<Integer> list2) {
        int total = list1.get(list1.size() - 1);
        list2.stream().map(n -> n + total).forEach(list1::add);
        return list1;
    }
}

This is used as:

myIntStream.parallel()
    .collect(ArrayList<Integer>::new, Accumulator::accept, Accumulator::combine)
    .stream();

Hopefully you can see that the important attribute of this collector is that even if the stream is parallel as the Accumulator instances are combined it adjusts the totals.

This is obviously not as efficient as a map operation because it collects the whole stream and then produces a new stream. But that's not just an implementation detail: it's a necessary function of the fact that streams are intended to be potentially concurrently processed.

I have tested it with IntStream.range(0, 10000).parallel() and it functions correctly.

like image 36
sprinter Avatar answered Nov 15 '22 15:11

sprinter