Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Merge Two Streams

I am trying to implement a method to merge the values in two Streams based on a Comparator for the values.

I had a way to do this, where I iterate over the streams and insert the values into a Stream.Builder, but I have not been able to figure out how to make a lazy-evaluated version (the way many stream operations are), so it can deal with infinite streams as well.

All I want it to do is perform a single merging pass on the input data, not sort the streams (in fact, it is likely that the streams will be disordered; this disorder needs to be preserved).

static Stream<E> merge(Stream<E> first, Stream<E> second, Comparator<E> c)

How can I lazily merge two streams like this?

If I were doing this with two Queues as input and some Consumer as output, it would be fairly simple:

void merge(Queue<E> first, Queue<E> second, Consumer<E> out, Comparator<E> c){
    while(!first.isEmpty() && !second.isEmpty()
        if(c.compare(first.peek(), second.peek()) <= 0)
            out.accept(first.remove());
        else
            out.accept(second.remove());
    for(E e:first)
        out.accept(e);
    for(E e:second)
        out.accept(e);
}

But I need to do this with lazy evaluation, and streams.

To address the comments, here are some example inputs and the result:

Example 1:

merge(
    Stream.of(1, 2, 3, 1, 2, 3),
    Stream.of(2, 2, 3, 2, 2, 2),
    Comparator.naturalOrder()
);

would return a stream that would produce this sequence:

1, 2, 2, 2, 3, 3, 1, 2, 2, 2, 2, 3

Example 2:

merge(
    Stream.iterate(5, i->i-1),
    Stream.iterate(1, i->i+1),
    Comparator.naturalOrder()
);

would return an infinite (well, an INT_MAX + 5 item) stream that would produce the sequence:

1, 2, 3, 4, 5, 5, 4, 3, 2, 1, 0, -1 ...

As you can see, this is not merely concat(first,second).sort(), since (a) you can't sort infinite streams, and (b) even when you can sort the streams, it does not give the desired result.

like image 897
AJMansfield Avatar asked Apr 09 '14 21:04

AJMansfield


People also ask

How do I concatenate two streams?

Stream. concat() method creates a concatenated stream in which the elements are all the elements of the first stream followed by all the elements of the second stream. The resulting stream is ordered if both of the input streams are ordered, and parallel if either of the input streams is parallel. The calls to Stream.

Can you merge streams?

3.1. Merging Streams. Since it is an instance method, we can easily chain it and append multiple streams. Note that we could also create a List out of the stream by using toList() if we type the resultingStream variable to the StreamEx type.

Which stream is used to merge multiple streams?

The concat() method is a static method of the Stream Interface that can be used to merge two streams into a single stream. The merged stream contains all the elements of the first stream, followed by all the elements of the second stream. If both the streams are ordered, then the merged stream will be ordered.

How do I concatenate 3 streams?

With three streams we could write Stream. concat(Stream. concat(a, b), c) .


1 Answers

You need to implement a Spliterator, rather than going through Stream.Builder. For this, you might even just go through an Iterator, since it's a fairly sequential operation. Using Guava lightly,

return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
    Iterators.mergeSorted(
      Arrays.asList(stream1.iterator(), stream2.iterator()),
      comparator),
    Spliterator.ORDERED),
  false /* not parallel */ );
like image 167
Louis Wasserman Avatar answered Sep 21 '22 12:09

Louis Wasserman