I am looking for a way to implement a non-terminal grouping operation, such that the memory overhead will be minimal.
For example, consider distinct(). In the general case, it has no choice but to collect all distinct items, and only then stream them forward. However, if we know that the input stream is already sorted, the operation could be done "on-the-fly", using minimal memory.
I know I can achieve this for iterators using an iterator wrapper and implementing the grouping logic myself. Is there a simpler way to implement this using streams API instead?
--EDIT--
I found a way to abuse Stream.flatMap(..) to achieve this:
private static class DedupSeq implements IntFunction<IntStream> {
private Integer prev;
@Override
public IntStream apply(int value) {
IntStream res = (prev != null && value == prev)? IntStream.empty() : IntStream.of(value);
prev = value;
return res;
}
}
And then:
IntStream.of(1,1,3,3,3,4,4,5).flatMap(new DedupSeq()).forEach(System.out::println);
Which prints:
1
3
4
5
With some changes, the same technique can be used for any kind of memory-efficient sequence grouping of streams. Anyway, I don't like much this solution, and I was looking for something more natural (like the way mapping or filtering work for example). Furthermore, I'm breaking the contract here because the function supplied to flatMap(..) is stateful.
Stream sorted() in JavaFor ordered streams, the sort method is stable but for unordered streams, no stability is guaranteed. It is a stateful intermediate operation i.e, it may incorporate state from previously seen elements when processing new elements.
If you have a small list, loops perform better. If you have a huge list, a parallel stream will perform better. Purely thinking in terms of performance, you shouldn't use a for-each loop with an ArrayList, as it creates an extra Iterator instance that you don't need (for LinkedList it's a different matter).
For this particular test, streams are about twice as slow as collections, and parallelism doesn't help (or either I'm using it the wrong way?).
If you want a solution that doesn’t add mutable state to a function that isn’t supposed to have it, you may resort to collect
:
static void distinctForSorted(IntStream s, IntConsumer action) {
s.collect(()->new long[]{Long.MIN_VALUE},
(a, i)->{ if(a[0]!=i) { action.accept(i); assert i>a[0]; a[0]=i; }},
(a, b)->{ throw new UnsupportedOperationException(); });
}
This works as it is the intended way of using mutable containers, however, it can’t work in parallel as splitting at arbitrary stream positions implies the possibility to encounter a value in two (or even more) threads.
If you want a general purpose IntStream
rather than a forEach
action, a Spliterator
low level solution is preferred, despite the added complexity.
static IntStream distinctForSorted(IntStream s) {
Spliterator.OfInt sp=s.spliterator();
return StreamSupport.intStream(
new Spliterators.AbstractIntSpliterator(sp.estimateSize(),
Spliterator.DISTINCT|Spliterator.SORTED|Spliterator.NONNULL|Spliterator.ORDERED) {
long last=Long.MIN_VALUE;
@Override
public boolean tryAdvance(IntConsumer action) {
long prev=last;
do if(!sp.tryAdvance(distinct(action))) return false; while(prev==last);
return true;
}
@Override
public void forEachRemaining(IntConsumer action) {
sp.forEachRemaining(distinct(action));
}
@Override
public Comparator<? super Integer> getComparator() {
return null;
}
private IntConsumer distinct(IntConsumer c) {
return i-> {
if(i==last) return;
assert i>last;
last=i;
c.accept(i);
};
}
}, false);
}
It even inherits a parallel support though it works by prefetching some values before processing them in another thread so it won’t accelerate the distinct operation, but maybe follow-up operations, if there are computation intense ones.
For completion, here is a distinct operation for arbitrary, i.e. unsorted, IntStream
s which doesn’t rely on “boxing plus HashMap
” thus may have a much better memory footprint:
static IntStream distinct(IntStream s) {
boolean parallel=s.isParallel();
s=s.collect(BitSet::new, BitSet::set, BitSet::or).stream();
if(parallel) s=s.parallel();
return s;
}
It works for positive int
values only; expanding it to the full 32 bit range would require two BitSet
s thus not look as concise, but often the use case allows limiting the storage to the 31 bit range or even lower…
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With