Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java Streams - grouping items on sorted streams efficiently

I am looking for a way to implement a non-terminal grouping operation, such that the memory overhead will be minimal.

For example, consider distinct(). In the general case, it has no choice but to collect all distinct items, and only then stream them forward. However, if we know that the input stream is already sorted, the operation could be done "on-the-fly", using minimal memory.

I know I can achieve this for iterators using an iterator wrapper and implementing the grouping logic myself. Is there a simpler way to implement this using streams API instead?

--EDIT--

I found a way to abuse Stream.flatMap(..) to achieve this:

  private static class DedupSeq implements IntFunction<IntStream> {
    private Integer prev;

    @Override
    public IntStream apply(int value) {
      IntStream res = (prev != null && value == prev)? IntStream.empty() : IntStream.of(value);
      prev = value;
      return res;
    }    
  }

And then:

IntStream.of(1,1,3,3,3,4,4,5).flatMap(new DedupSeq()).forEach(System.out::println);

Which prints:

1
3
4
5

With some changes, the same technique can be used for any kind of memory-efficient sequence grouping of streams. Anyway, I don't like much this solution, and I was looking for something more natural (like the way mapping or filtering work for example). Furthermore, I'm breaking the contract here because the function supplied to flatMap(..) is stateful.

like image 804
Eyal Schneider Avatar asked Apr 12 '15 09:04

Eyal Schneider


People also ask

Is Java stream sorted stable?

Stream sorted() in JavaFor ordered streams, the sort method is stable but for unordered streams, no stability is guaranteed. It is a stateful intermediate operation i.e, it may incorporate state from previously seen elements when processing new elements.

Are streams more efficient than for loops?

If you have a small list, loops perform better. If you have a huge list, a parallel stream will perform better. Purely thinking in terms of performance, you shouldn't use a for-each loop with an ArrayList, as it creates an extra Iterator instance that you don't need (for LinkedList it's a different matter).

Which is faster stream or collection?

For this particular test, streams are about twice as slow as collections, and parallelism doesn't help (or either I'm using it the wrong way?).


1 Answers

If you want a solution that doesn’t add mutable state to a function that isn’t supposed to have it, you may resort to collect:

static void distinctForSorted(IntStream s, IntConsumer action) {
    s.collect(()->new long[]{Long.MIN_VALUE},
              (a, i)->{ if(a[0]!=i) { action.accept(i); assert i>a[0]; a[0]=i; }},
              (a, b)->{ throw new UnsupportedOperationException(); });
}

This works as it is the intended way of using mutable containers, however, it can’t work in parallel as splitting at arbitrary stream positions implies the possibility to encounter a value in two (or even more) threads.

If you want a general purpose IntStream rather than a forEach action, a Spliterator low level solution is preferred, despite the added complexity.

static IntStream distinctForSorted(IntStream s) {
    Spliterator.OfInt sp=s.spliterator();
    return StreamSupport.intStream(
      new Spliterators.AbstractIntSpliterator(sp.estimateSize(),
      Spliterator.DISTINCT|Spliterator.SORTED|Spliterator.NONNULL|Spliterator.ORDERED) {
        long last=Long.MIN_VALUE;
        @Override
        public boolean tryAdvance(IntConsumer action) {
            long prev=last;
            do if(!sp.tryAdvance(distinct(action))) return false; while(prev==last);
            return true;
        }
        @Override
        public void forEachRemaining(IntConsumer action) {
            sp.forEachRemaining(distinct(action));
        }
        @Override
        public Comparator<? super Integer> getComparator() {
            return null;
        }
        private IntConsumer distinct(IntConsumer c) {
            return i-> {
                if(i==last) return;
                assert i>last;
                last=i;
                c.accept(i);
            };
        }
    }, false);
}

It even inherits a parallel support though it works by prefetching some values before processing them in another thread so it won’t accelerate the distinct operation, but maybe follow-up operations, if there are computation intense ones.


For completion, here is a distinct operation for arbitrary, i.e. unsorted, IntStreams which doesn’t rely on “boxing plus HashMap” thus may have a much better memory footprint:

static IntStream distinct(IntStream s) {
    boolean parallel=s.isParallel();
    s=s.collect(BitSet::new, BitSet::set, BitSet::or).stream();
    if(parallel) s=s.parallel();
    return s;
}

It works for positive int values only; expanding it to the full 32 bit range would require two BitSets thus not look as concise, but often the use case allows limiting the storage to the 31 bit range or even lower…

like image 179
Holger Avatar answered Nov 14 '22 22:11

Holger