Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Count elements from Stream but consider only N for collecting

Is the following lambda possible somehow in Java? I'd like to count elements from my filtered stream but collaterally store the first 10

stream().filter(myFilter)  //Reduces input to forthcoming operations
        .limit(10)         //Limits to ten the amount of elements to finish stream 
        .peek(myList::add) //Stores the ten elements into a list
        .count();          //Here is the difficult one. Id like to count everything  the total of elements that pass the filter, beyond the 10 I am fetching

EDIT: It was too implicit from my side, but the idea is meant of course as a potential solution which would be the fastest (faster than calling twice the stream generator and do both operations separately at least):

List<Entity> entities = stream().filter(myFilter) 
                                .limit(10)
                                .collect(Collectors.toList());
long entitiesCount = stream().filter(myFilter) 
                             .count();

... taking profit of a single iteration, and without having to load the whole collection on memory. I'm doing tests with parallelization of the answers

like image 314
Whimusical Avatar asked Oct 09 '18 17:10

Whimusical


3 Answers

A custom collector is the answer here:

Entry<List<Integer>, Integer> result = list.stream()
            .collect(Collector.of(
                    () -> new SimpleEntry<>(new ArrayList<>(), 0),
                    (l, x) -> {
                        if (l.getKey().size() < 10) {
                            l.getKey().add(x);
                        }
                        l.setValue(l.getValue() + 1);
                    },
                    (left, right) -> {
                        List<Integer> leftList = left.getKey();
                        List<Integer> rightList = right.getKey();
                        while (leftList.size() < 10 && rightList.size() > 0) {
                            leftList.add(rightList.remove(0));
                        }
                        left.setValue(left.getValue() + right.getValue());
                        return left;
                    }));

Suppose you have this code:

Set.of(1, 2, 3, 4)
            .stream()
            .parallel()
            .collect(Collector.of(
                    ArrayList::new,
                    (list, ele) -> {
                        System.out.println("Called accumulator");
                        list.add(ele);
                    },
                    (left, right) -> {
                        System.out.println("Combiner called");
                        left.addAll(right);
                        return left;
                    },
                    new Characteristics[] { Characteristics.CONCURRENT }));

Before we start thinking about that code (it does matter how correct it is for the purpose of the example), we need to read the documentation a bit for CONCURRENT characteristic:

If a CONCURRENT collector is not also UNORDERED, then it should only be evaluated concurrently if applied to an unordered data source.

What this documentation basically says is that if your collector is CONCURRENT and the source of the stream is UNORDERED (like a Set) or we explicitly call unordered then the merger will never get called.

If you run the previous code, you will see that Combiner called is never present in the output.

If you change the Set.of(1, 2, 3, 4) to List.of(1, 2, 3, 4) you will see a different picture (ignore the correctness of the result you get - since ArrayList is not thread safe, but that is not the point). If you have the source of the stream to be a List and at the same time you call unordered you will again see that only the accumulator is called, that is:

 List.of(1, 2, 3, 4)
            .stream()
            .unordered()
            .parallel()
            .collect(Collector.of(
                    ArrayList::new,
                    (list, ele) -> {
                        System.out.println("Called accumulator");
                        list.add(ele);
                    },
                    (left, right) -> {
                        System.out.println("Combiner called");
                        left.addAll(right);
                        return left;
                    },
                    new Characteristics[] { Characteristics.CONCURRENT }));
like image 176
Eugene Avatar answered Nov 01 '22 16:11

Eugene


The following uses a mutable reduction with the help of a local class holding the summary.
The limit on the collected elements is done by just picking the first 10 elements in the combiner function.

Example using IntStream:

Stat result = IntStream.range(0, 100)
        .boxed()
        .filter(i -> i % 2 == 0)
        .collect(() -> new Stat(0, new ArrayList<Integer>()), 
            (stat, integer) -> {
                stat.count++;
                if (stat.list.size() < 10) {
                    stat.list.add(integer);
                }
            }, 
            (stat1, stat2) -> {
                stat1.list.addAll(stat2.list.subList(0, Math.min(stat2.list.size(), 
                    10 - stat1.list.size())));
            });

And here's the Stat class used in the stream (you can easily use something like Pair<Long, List<Integer>>):

private static class Stat {
    long count;
    List<Integer> list;

    public Stat(long count, List<Integer> list) {
        this.count = count;
        this.list = list;
    }
}

The above example results in [count=50,list=[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]]

like image 21
ernest_k Avatar answered Nov 01 '22 14:11

ernest_k


Here's a simple lambda expression that will count add to your list any items, up to 10, that make it past your filter:

i -> {if (myList.size() < 10) myList.add(i);}

But you can't simply use count() on Stream:

An implementation may choose to not execute the stream pipeline (either sequentially or in parallel) if it is capable of computing the count directly from the stream source. In such cases no source elements will be traversed and no intermediate operations will be evaluated. Behavioral parameters with side-effects, which are strongly discouraged except for harmless cases such as debugging, may be affected.

For my case, using count(), peek() was not called because the elements were not traversed, and my list was empty.

Choose a simple reduction to count the elements.

.reduce(0, (a, b) -> a + 1);

My code:

int count = yourCollection.stream()
    .filter(myFilter)
    .peek(i -> {if (myList.size() < 10) myList.add(i);} )
    .reduce(0, (a, b) -> a + 1);
like image 28
rgettman Avatar answered Nov 01 '22 14:11

rgettman