Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Create stream of streams from one long stream

I want to split a single Stream into a Stream of Streams based on the contents of the Streams. The resulting the Stream should contain part of the original streams' data.

My real application is more complex (it is grouping log lines that are within a list of time intervals), but my problem is how to handle the streams, so here I ask about a simplified example.

Example problem

I want to be able split a Stream<Integer> into a Stream<Stream<Integer>> based on the same number being repeated, only leaving the streams with odd numbers.

For example the following stream containing:

{1,1,1,2,2,2,3,6,7,7,1,1}

Would need to result in a stream of streams containing:

{{1,1,1},{3},{7,7},{1,1}}

Leaving out the even numbers I can do by starting (or ending) with a filter:

Stream<Integer> input = ...;
Straem<Stream<Integer>> output = input.filter(this::isOdd).someOtherOperation();

This is undesired as it would mean evaluating each input value twice, this is acceptable but I would prefer avoiding this.

Ideas for solutions

My current solution does this iterating over the contents of the stream and creating a List<List<Integer>> and converting that to a Stream<Stream<Integer>>. However this means the full result is kept in memory (which is undesired for my application).

I also think I would be able to pull this of by writing my own Iterator that reads from the stream, but I am not sure how this would work.

Question

How can I convert a Stream into a Stream of Streams based on the contents of the original Stream, without storing the full result in as a List of Lists first.

like image 708
Thirler Avatar asked Jun 24 '15 08:06

Thirler


People also ask

How do I combine two streams?

concat() in Java. Stream. concat() method creates a concatenated stream in which the elements are all the elements of the first stream followed by all the elements of the second stream. The resulting stream is ordered if both of the input streams are ordered, and parallel if either of the input streams is parallel.

Can a stream be consumed more than once?

A Stream is considered consumed once a terminal operation is executed. However, even multiple intermediate operations are not supposed to be executed for the same Stream instance, as stated in the Stream javadoc: A stream should be operated on (invoking an intermediate or terminal stream operation) only once.

How do you loop through a stream?

Java Stream forEach() method is used to iterate over all the elements of the given Stream and to perform an Consumer action on each element of the Stream. The forEach() is a more concise way to write the for-each loop statements.


3 Answers

You may want to implement your own aggregating spliterator to do this. There's already something similar in the proton-pack library (the first link redirects to the one implemented in proton-pack).

Note that you get a Stream<List<Integer>> (you may try to modify the implementation to have a Stream<Stream<Integer>> directly, but you always need to buffer a small amount elements; depending on the window's size; to test whether you should create a new window or not). So for example:

StreamUtils.aggregate(Stream.of(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1), 
                      Objects::equals)
           .forEach(System.out::println);

outputs:

[1, 1, 1]
[2, 2, 2]
[3]
[6]
[7, 7]
[1, 1]
like image 150
Alexis C. Avatar answered Sep 30 '22 09:09

Alexis C.


You can use my StreamEx library. It has groupRuns which does the job:

List<Integer> input = Arrays.asList(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1);
Stream<Stream<Integer>> streams = StreamEx.of(input).filter(this::isOdd)
    .groupRuns(Integer::equals)
    .map(List::stream);

Usage example:

streams.map(s -> StreamEx.of(s).joining(",")).forEach(System.out::println);

Output:

1,1,1
3
7,7
1,1

Similar to protonpack library there's a custom spliterator inside, but using StreamEx you can take advantage of parallel processing (the protonpack does not split at all).

In sequential processing at most one intermediate list resides in memory at a time (others are eligible for GC). If you still worry about memory consumption (for example, you have very long groups), there is an alternative way to solve this task since the StreamEx 0.3.3:

Stream<Stream<Integer>> streams = StreamEx.of(input).filter(this::isOdd)
        .runLengths()
        .mapKeyValue(StreamEx::constant);

The runLengths method returns the stream of entries where key is the element and value is the number of adjacent repeating elements. After that StreamEx.constant is used which is shortcut for Stream.generate(() -> value).limit(length). So you will have a constant intermediate memory consumption even for very long groups. Of course this version is also parallel-friendly.

Update: StreamEx 0.3.3 is released, thus the second solution is now eligible as well.

like image 21
Tagir Valeev Avatar answered Sep 30 '22 07:09

Tagir Valeev


I am afraid it is not doable, at least not in a nice way. Even if you map the elements into streams and reduce them, these internal streams will have to know what elements they contain so they will have to store something.

The simplest solution is to just use groupingBy however it will store all results in the map:

List<Integer> input = asList(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1);
Map<Integer, List<Integer>> grouped = input.stream().collect(groupingBy(i -> i));
Stream<Stream<Integer>> streamOfStreams = grouped.values().stream().map(list -> list.stream());

You could try using reduce operation but it would require you to implement your own Stream of Streams in which you would have to store what elements every stream contains anyway. Not to mention that it would be a lot of effort to implement it.

The best solution I can think of in your case would be to iterate over the list twice:

public static void main(String[] args) {
    List<Integer> input = asList(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1);

    input.stream().distinct().filter(i -> isOdd(i)).forEach(i -> {
        List<Integer> subList = input.stream().filter(j -> Objects.equals(j, i)).collect(toList());
        System.out.println(subList); // do something with the stream instead of collecting to list
    });
}

private static boolean isOdd(Integer i) {
    return (i & 1) == 1;
}

Note however that it has O(n^2) time complexity.

EDIT:

This solution will only have local groups of elements. It stores only the current local group.

public static void main(String[] args) {
    Stream<Integer> input = Stream.of(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1);

    Iterator<Integer> iterator = input.iterator();
    int first;
    int second = iterator.next();

    List<Integer> buffer = new ArrayList<>();
    buffer.add(second);

    do {
        first = second;
        second = iterator.next();

        if (Objects.equals(first, second)) {
            buffer.add(second);
        } else {
            doSomethingWithTheGroup(buffer);
            buffer = new ArrayList<>(); // let GC remove the previous buffer
            buffer.add(second);
        }
    } while (iterator.hasNext());
    doSomethingWithTheGroup(buffer);
}

private static void doSomethingWithTheGroup(List<Integer> buffer) {
    System.out.println(buffer);
}

private static boolean isOdd(Integer i) {
    return (i & 1) == 1;
}

output:

[1, 1, 1]
[2, 2, 2]
[3]
[6]
[7, 7]
[1, 1]
like image 30
Jaroslaw Pawlak Avatar answered Sep 30 '22 08:09

Jaroslaw Pawlak