Is the following lambda possible somehow in Java? I'd like to count elements from my filtered stream but collaterally store the first 10
stream().filter(myFilter) //Reduces input to forthcoming operations
.limit(10) //Limits to ten the amount of elements to finish stream
.peek(myList::add) //Stores the ten elements into a list
.count(); //Here is the difficult one. Id like to count everything the total of elements that pass the filter, beyond the 10 I am fetching
EDIT: It was too implicit from my side, but the idea is meant of course as a potential solution which would be the fastest (faster than calling twice the stream generator and do both operations separately at least):
List<Entity> entities = stream().filter(myFilter)
.limit(10)
.collect(Collectors.toList());
long entitiesCount = stream().filter(myFilter)
.count();
... taking profit of a single iteration, and without having to load the whole collection on memory. I'm doing tests with parallelization of the answers
A custom collector is the answer here:
Entry<List<Integer>, Integer> result = list.stream()
.collect(Collector.of(
() -> new SimpleEntry<>(new ArrayList<>(), 0),
(l, x) -> {
if (l.getKey().size() < 10) {
l.getKey().add(x);
}
l.setValue(l.getValue() + 1);
},
(left, right) -> {
List<Integer> leftList = left.getKey();
List<Integer> rightList = right.getKey();
while (leftList.size() < 10 && rightList.size() > 0) {
leftList.add(rightList.remove(0));
}
left.setValue(left.getValue() + right.getValue());
return left;
}));
Suppose you have this code:
Set.of(1, 2, 3, 4)
.stream()
.parallel()
.collect(Collector.of(
ArrayList::new,
(list, ele) -> {
System.out.println("Called accumulator");
list.add(ele);
},
(left, right) -> {
System.out.println("Combiner called");
left.addAll(right);
return left;
},
new Characteristics[] { Characteristics.CONCURRENT }));
Before we start thinking about that code (it does matter how correct it is for the purpose of the example), we need to read the documentation a bit for CONCURRENT
characteristic:
If a CONCURRENT collector is not also UNORDERED, then it should only be evaluated concurrently if applied to an unordered data source.
What this documentation basically says is that if your collector is CONCURRENT
and the source of the stream is UNORDERED
(like a Set
) or we explicitly call unordered
then the merger will never get called.
If you run the previous code, you will see that Combiner called
is never present in the output.
If you change the Set.of(1, 2, 3, 4)
to List.of(1, 2, 3, 4)
you will see a different picture (ignore the correctness of the result you get - since ArrayList
is not thread safe, but that is not the point). If you have the source of the stream to be a List
and at the same time you call unordered
you will again see that only the accumulator is called, that is:
List.of(1, 2, 3, 4)
.stream()
.unordered()
.parallel()
.collect(Collector.of(
ArrayList::new,
(list, ele) -> {
System.out.println("Called accumulator");
list.add(ele);
},
(left, right) -> {
System.out.println("Combiner called");
left.addAll(right);
return left;
},
new Characteristics[] { Characteristics.CONCURRENT }));
The following uses a mutable reduction with the help of a local class holding the summary.
The limit on the collected elements is done by just picking the first 10 elements in the combiner
function.
Example using IntStream
:
Stat result = IntStream.range(0, 100)
.boxed()
.filter(i -> i % 2 == 0)
.collect(() -> new Stat(0, new ArrayList<Integer>()),
(stat, integer) -> {
stat.count++;
if (stat.list.size() < 10) {
stat.list.add(integer);
}
},
(stat1, stat2) -> {
stat1.list.addAll(stat2.list.subList(0, Math.min(stat2.list.size(),
10 - stat1.list.size())));
});
And here's the Stat
class used in the stream (you can easily use something like Pair<Long, List<Integer>>
):
private static class Stat {
long count;
List<Integer> list;
public Stat(long count, List<Integer> list) {
this.count = count;
this.list = list;
}
}
The above example results in [count=50,list=[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]]
Here's a simple lambda expression that will count add to your list any items, up to 10, that make it past your filter:
i -> {if (myList.size() < 10) myList.add(i);}
But you can't simply use count()
on Stream
:
An implementation may choose to not execute the stream pipeline (either sequentially or in parallel) if it is capable of computing the count directly from the stream source. In such cases no source elements will be traversed and no intermediate operations will be evaluated. Behavioral parameters with side-effects, which are strongly discouraged except for harmless cases such as debugging, may be affected.
For my case, using count()
, peek()
was not called because the elements were not traversed, and my list was empty.
Choose a simple reduction to count the elements.
.reduce(0, (a, b) -> a + 1);
My code:
int count = yourCollection.stream()
.filter(myFilter)
.peek(i -> {if (myList.size() < 10) myList.add(i);} )
.reduce(0, (a, b) -> a + 1);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With