Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java 8 lambda: iterate over stream objects and use previous/next object(s) in stream

I am practising some entry-level java 8 lambda functionality.

Given a list of messages, each containing a message offset, where all offsets must form a consecutive list of integers, I'm trying to find gaps to warn about. I get the feeling this all should be well doable with a nice lambda. But I can't get my head around it.

So, there's this working snippet:

private void warnAboutMessageGaps(final List<Message> messages) {

    final List<Long> offsets = messages.stream()
            .sorted(comparingLong(Message::getOffset))
            .map(Message::getOffset)
            .collect(toList())
            ;

    for (int i = 0; i < offsets.size() - 1; i++) {
        final long currentOffset = offsets.get(i);
        final long expectedNextOffset = offsets.get(i) + 1;
        final long actualNextOffset = offsets.get(i + 1);
        if (currentOffset != expectedNextOffset) {
            LOG.error("Missing offset(s) found in messages: missing from {} to {}", currentOffset + 1, actualNextOffset - 1);
        }
    }
}

What I can't figure out is how to make it so that I can do the "compare with previous/next object" in the lambda. Any pointers would be appreciated.

/edit: Suggestions about StreamEx and other third-party solutions, while appreciated, are not what I was looking for.

like image 467
SadBunny Avatar asked Mar 15 '17 13:03

SadBunny


2 Answers

You can do it with StreamEx using a pairMap method:

StreamEx.of(messages)
        .sorted(Comparator.comparingLong(Message::getOffset))
        .pairMap((prev, next) -> new Message[] {prev, next})
        .forEach(prevNext -> {
            long currentOffset = prevNext[0].getOffset();
            long expectedNextOffset = prevNext[0].getOffset() + 1;
            long actualNextOffset = prevNext[1].getOffset();
            if (currentOffset != expectedNextOffset) {
                LOG.error(
                    "Missing offset(s) found in messages: missing from {} to {}",
                    currentOffset + 1, actualNextOffset - 1);
            }
        });
like image 65
ZhekaKozlov Avatar answered Oct 04 '22 16:10

ZhekaKozlov


Sometimes, attempting to do everything with lambda expressions makes solutions more complicated. You can use:

messages.stream()
    .mapToLong(Message::getOffset)
    .sorted()
    .forEachOrdered(new LongConsumer() {
        boolean first=true;
        long expected;
        public void accept(long value) {
            if(first) first=false;
            else if(value!=expected)
                LOG.error("Missing offset(s) found in messages: missing from {} to {}",
                          expected, value);
            expected=value+1;
        }
    });

but note that regardless of how fluent the stream chain may look like, sorted() is a stateful intermediate operation which creates and uses a backing array behind the scenes. You’re not loosing anything, if you use that array explicitly:

long[] l = messages.stream().mapToLong(Message::getOffset).toArray();
Arrays.sort(l);
for(int ix=1; ix<l.length; ix++) {
    long value = l[ix], expected = l[ix-1]+1;
    if(value!=expected)
        LOG.error("Missing offset(s) found in messages: missing from {} to {}",
                  expected, value);
}

It’s hard to find a simpler solution. But if you want to reduce the amount of memory needed, you can use a BitSet instead of an array:

OptionalLong optMin = messages.stream().mapToLong(Message::getOffset).min();
if(!optMin.isPresent()) return;
long min = optMin.getAsLong();
BitSet bset = messages.stream()
    .mapToLong(Message::getOffset)
    .collect(BitSet::new, (bs,l) -> bs.set((int)(l-min)), BitSet::or);
for(int set=0, clear; set>=0; ) {
    clear = bset.nextClearBit(set);
    set = bset.nextSetBit(clear);
    if(set >= 0)
        LOG.error("Missing offset(s) found in messages: missing from {} to {}",
                  min+clear, min+set);
}

This will reduce the used memory significantly in the cases where no gaps or reasonably small gaps, compared to the value range of the offsets, occur. It fails when the distance between the smallest offset and the largest offset is greater than Integer.MAX_VALUE.

You might check that beforehand, which also opens the opportunity to short-cut if there are no gaps at all:

LongSummaryStatistics stat = messages.stream()
    .mapToLong(Message::getOffset).summaryStatistics();
if(stat.getCount()==0 ||
   // all solutions assume that there are no duplicates, in this case,
   // the following test allows to prove that there are no gaps:
   stat.getMax()-stat.getMin()==messages.size()-1) {
    return;
}

if(stat.getMax()-stat.getMin()>Integer.MAX_VALUE) {
    // proceed with array based test
    …
}
else {
    long min = stat.getMin();
    // proceed with BitSet based test
    …
like image 29
Holger Avatar answered Oct 04 '22 15:10

Holger