Java 8 Streams do not permit reuse. This creates a puzzle about how to reuse a stream when creating a sliding window flux to calculate a relationship like x(i)*x(i-1).
The following code is based on the idea of a shift operator. I shift the first stream with skip(1) to create a second stream.
Flux<Integer> primary = Flux.fromStream(IntStream.range(1, 10).boxed());
Flux<Integer> secondary = primary.skip(1);
primary.zipWith(secondary)
.map(t -> t.getT1() * t.getT2())
.subscribe(System.out::println);
Here is a visual representation of the above code:
1 2 3 4 5 6 7 8 9 10
v v v v v v v v v v skip(1)
2 3 4 5 6 7 8 9 10
v v v v v v v v v v zipWith
1 2, 2 3, 3 4, 4 5, 5 6, 6 7, 7 8, 8 9, 9 10 <- sliding window of length 2
v v v v v v v v v v multiples
2 6 12 20 30 42 56 72 90
Unfortunately this code errors as:
java.lang.IllegalStateException: stream has already been operated upon or closed
The obvious work-around is to cache the elements and ensure the cache size is greater than or equal to the stream size:
Flux<Integer> primary = Flux.fromStream(IntStream.range(1, 10).boxed()).cache(10);
or use a stream replacement:
Flux<Integer> primary = Flux.range(0, 10);
The second solution will just re-execute the original sequence for the skip(1) sequence.
However an efficient solution only requires a buffer of size 2. This is a big deal if the stream happens to be a large file:
Files.lines(Paths.get(megaFile));
How can I buffer a stream efficiently so multiple subscriptions to the primary Flux do not cause everything to be read into memory or cause re-executions?
Flux. fromIterable : This is used to build a stream from collections. All collections are of the Iterable<T> type, which can be passed to this to generate the intended stream.
The create() method in Flux is used when we want to calculate multiple (0 to infinity) values that are not influenced by the application's state. This is because the underlying method of the Flux create() method keeps calculating the elements.
The abstract base class for connectable publishers that let subscribers pile up before they connect to their data source. See Also: Flux.publish() , Reactive-Streams-Commons.
To create a Flux from Mono, you can use the overloaded method from(), which accepts a Publisher as an argument.
I finally discovered a solution although it is not buffer-oriented. The inspiration was to first solve the problem for a sliding window of 2:
Flux<Integer> primary = Flux.fromStream(IntStream.range(0, 10).boxed());
primary.flatMap(num -> Flux.just(num, num))
.skip(1)
.buffer(2)
.filter(list -> list.size() == 2)
.map(list -> Arrays.toString(list.toArray()))
.subscribe(System.out::println);
A visual representation of the process follows:
1 2 3 4 5 6 7 8 9
V V V V V V V V V Flux.just(num, num)
1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9
V V V V V V V V V skip(1)
1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9
V V V V V V V V V bufffer(2)
1 2, 2 3, 3 4, 4 5, 5 6, 6 7, 7 8, 8 9, 9
V V V V V V V V V filter
1 2, 2 3, 3 4, 4 5, 5 6, 6 7, 7 8, 8 9
This is the output:
[0, 1]
[1, 2]
[2, 3]
[3, 4]
[4, 5]
[5, 6]
[6, 7]
[7, 8]
[8, 9]
Then I generalized the above idea to create a solution for an arbitrary sliding window size:
public class SlidingWindow {
public static void main(String[] args) {
System.out.println("Different sliding windows for sequence 0 to 9:");
SlidingWindow flux = new SlidingWindow();
for (int windowSize = 1; windowSize < 5; windowSize++) {
flux.slidingWindow(windowSize, IntStream.range(0, 10).boxed())
.map(SlidingWindow::listToString)
.subscribe(System.out::print);
System.out.println();
}
//show stream difference: x(i)-x(i-1)
List<Integer> sequence = Arrays.asList(new Integer[]{10, 12, 11, 9, 13, 17, 21});
System.out.println("Show difference 'x(i)-x(i-1)' for " + listToString(sequence));
flux.slidingWindow(2, sequence.stream())
.doOnNext(SlidingWindow::printlist)
.map(list -> list.get(1) - list.get(0))
.subscribe(System.out::println);
System.out.println();
}
public <T> Flux<List<T>> slidingWindow(int windowSize, Stream<T> stream) {
if (windowSize > 0) {
Flux<List<T>> flux = Flux.fromStream(stream).map(ele -> Arrays.asList(ele));
for (int i = 1; i < windowSize; i++) {
flux = addDepth(flux);
}
return flux;
} else {
return Flux.empty();
}
}
protected <T> Flux<List<T>> addDepth(Flux<List<T>> flux) {
return flux.flatMap(list -> Flux.just(list, list))
.skip(1)
.buffer(2)
.filter(list -> list.size() == 2)
.map(list -> flatten(list));
}
protected <T> List<T> flatten(List<List<T>> list) {
LinkedList<T> newl = new LinkedList<>(list.get(1));
newl.addFirst(list.get(0).get(0));
return newl;
}
static String listToString(List list) {
return list.stream()
.map(i -> i.toString())
.collect(Collectors.joining(", ", "[ ", " ], "))
.toString();
}
static void printlist(List list) {
System.out.print(listToString(list));
}
}
The output of the above code is as follows:
Different sliding windows for sequence 0 to 9:
[ 0 ], [ 1 ], [ 2 ], [ 3 ], [ 4 ], [ 5 ], [ 6 ], [ 7 ], [ 8 ], [ 9 ],
[ 0, 1 ], [ 1, 2 ], [ 2, 3 ], [ 3, 4 ], [ 4, 5 ], [ 5, 6 ], [ 6, 7 ], [ 7, 8 ], [ 8, 9 ],
[ 0, 1, 2 ], [ 1, 2, 3 ], [ 2, 3, 4 ], [ 3, 4, 5 ], [ 4, 5, 6 ], [ 5, 6, 7 ], [ 6, 7, 8 ], [ 7, 8, 9 ],
[ 0, 1, 2, 3 ], [ 1, 2, 3, 4 ], [ 2, 3, 4, 5 ], [ 3, 4, 5, 6 ], [ 4, 5, 6, 7 ], [ 5, 6, 7, 8 ], [ 6, 7, 8, 9 ],
Show difference 'x(i)-x(i-1)' for [ 10, 12, 11, 9, 13, 17, 21 ],
[ 10, 12 ], 2
[ 12, 11 ], -1
[ 11, 9 ], -2
[ 9, 13 ], 4
[ 13, 17 ], 4
[ 17, 21 ], 4
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With