I want to split a single Stream
into a Stream
of Streams
based on the contents of the Streams
. The resulting the Stream
should contain part of the original streams' data.
My real application is more complex (it is grouping log lines that are within a list of time intervals), but my problem is how to handle the streams, so here I ask about a simplified example.
I want to be able split a Stream<Integer>
into a Stream<Stream<Integer>>
based on the same number being repeated, only leaving the streams with odd numbers.
For example the following stream containing:
{1,1,1,2,2,2,3,6,7,7,1,1}
Would need to result in a stream of streams containing:
{{1,1,1},{3},{7,7},{1,1}}
Leaving out the even numbers I can do by starting (or ending) with a filter:
Stream<Integer> input = ...;
Straem<Stream<Integer>> output = input.filter(this::isOdd).someOtherOperation();
This is undesired as it would mean evaluating each input value twice, this is acceptable but I would prefer avoiding this.
My current solution does this iterating over the contents of the stream and creating a List<List<Integer>>
and converting that to a Stream<Stream<Integer>>
. However this means the full result is kept in memory (which is undesired for my application).
I also think I would be able to pull this of by writing my own Iterator
that reads from the stream, but I am not sure how this would work.
How can I convert a Stream
into a Stream
of Streams
based on the contents of the original Stream
, without storing the full result in as a List
of Lists
first.
concat() in Java. Stream. concat() method creates a concatenated stream in which the elements are all the elements of the first stream followed by all the elements of the second stream. The resulting stream is ordered if both of the input streams are ordered, and parallel if either of the input streams is parallel.
A Stream is considered consumed once a terminal operation is executed. However, even multiple intermediate operations are not supposed to be executed for the same Stream instance, as stated in the Stream javadoc: A stream should be operated on (invoking an intermediate or terminal stream operation) only once.
Java Stream forEach() method is used to iterate over all the elements of the given Stream and to perform an Consumer action on each element of the Stream. The forEach() is a more concise way to write the for-each loop statements.
You may want to implement your own aggregating spliterator to do this. There's already something similar in the proton-pack library (the first link redirects to the one implemented in proton-pack).
Note that you get a Stream<List<Integer>>
(you may try to modify the implementation to have a Stream<Stream<Integer>>
directly, but you always need to buffer a small amount elements; depending on the window's size; to test whether you should create a new window or not). So for example:
StreamUtils.aggregate(Stream.of(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1),
Objects::equals)
.forEach(System.out::println);
outputs:
[1, 1, 1]
[2, 2, 2]
[3]
[6]
[7, 7]
[1, 1]
You can use my StreamEx
library. It has groupRuns
which does the job:
List<Integer> input = Arrays.asList(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1);
Stream<Stream<Integer>> streams = StreamEx.of(input).filter(this::isOdd)
.groupRuns(Integer::equals)
.map(List::stream);
Usage example:
streams.map(s -> StreamEx.of(s).joining(",")).forEach(System.out::println);
Output:
1,1,1
3
7,7
1,1
Similar to protonpack library there's a custom spliterator inside, but using StreamEx you can take advantage of parallel processing (the protonpack does not split at all).
In sequential processing at most one intermediate list resides in memory at a time (others are eligible for GC). If you still worry about memory consumption (for example, you have very long groups), there is an alternative way to solve this task since the StreamEx 0.3.3:
Stream<Stream<Integer>> streams = StreamEx.of(input).filter(this::isOdd)
.runLengths()
.mapKeyValue(StreamEx::constant);
The runLengths
method returns the stream of entries where key is the element and value is the number of adjacent repeating elements. After that StreamEx.constant
is used which is shortcut for Stream.generate(() -> value).limit(length)
. So you will have a constant intermediate memory consumption even for very long groups. Of course this version is also parallel-friendly.
Update: StreamEx 0.3.3 is released, thus the second solution is now eligible as well.
I am afraid it is not doable, at least not in a nice way. Even if you map the elements into streams and reduce them, these internal streams will have to know what elements they contain so they will have to store something.
The simplest solution is to just use groupingBy
however it will store all results in the map:
List<Integer> input = asList(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1);
Map<Integer, List<Integer>> grouped = input.stream().collect(groupingBy(i -> i));
Stream<Stream<Integer>> streamOfStreams = grouped.values().stream().map(list -> list.stream());
You could try using reduce
operation but it would require you to implement your own Stream of Streams in which you would have to store what elements every stream contains anyway. Not to mention that it would be a lot of effort to implement it.
The best solution I can think of in your case would be to iterate over the list twice:
public static void main(String[] args) {
List<Integer> input = asList(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1);
input.stream().distinct().filter(i -> isOdd(i)).forEach(i -> {
List<Integer> subList = input.stream().filter(j -> Objects.equals(j, i)).collect(toList());
System.out.println(subList); // do something with the stream instead of collecting to list
});
}
private static boolean isOdd(Integer i) {
return (i & 1) == 1;
}
Note however that it has O(n^2)
time complexity.
EDIT:
This solution will only have local groups of elements. It stores only the current local group.
public static void main(String[] args) {
Stream<Integer> input = Stream.of(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1);
Iterator<Integer> iterator = input.iterator();
int first;
int second = iterator.next();
List<Integer> buffer = new ArrayList<>();
buffer.add(second);
do {
first = second;
second = iterator.next();
if (Objects.equals(first, second)) {
buffer.add(second);
} else {
doSomethingWithTheGroup(buffer);
buffer = new ArrayList<>(); // let GC remove the previous buffer
buffer.add(second);
}
} while (iterator.hasNext());
doSomethingWithTheGroup(buffer);
}
private static void doSomethingWithTheGroup(List<Integer> buffer) {
System.out.println(buffer);
}
private static boolean isOdd(Integer i) {
return (i & 1) == 1;
}
output:
[1, 1, 1]
[2, 2, 2]
[3]
[6]
[7, 7]
[1, 1]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With