I'm making my very own version of Java's Stream library for fun. Here's my class signature:
class Stream<T> {
Supplier<T> head;
Supplier<Stream<T>> tail;
...
}
Also, I wrote a basic infinite stream iterator that would generate an infinite list based on the given function:
public static <T> Stream<T> iterate(T first, Function<T, T> f) {
return new Stream<T>(
() -> first,
() -> {
T nextElem = f.apply(first);
if (nextElem == null) {
return generate(() -> null);
} else {
return iterate(nextElem, f);
}
}
);
}
The function generate
is a special case of iterate that repeats a given element forever. In the function above, I'm generating an infinite sequence of null
to indicate the end of the stream (I don't think I'll be storing null values in the stream).
I then wrote a reduce function where the reducing function is lazy on its second argument:
public <U> U reduce(U acc, Function<T, Function<Supplier<U>, U>> f) {
System.out.println("REDUCE CALL");
T elem = head.get();
if (elem != null) {
return f.apply(elem).apply(() -> this.tail.get().reduce(acc, f));
} else {
return acc;
}
}
Building upon the reduce function, I wrote the filter function.
public Stream<T> filter(Predicate<T> p) {
System.out.println("FILTER");
return reduce(generate(() -> null), elem -> acc -> {
if (p.test(elem)) {
return new Stream<>(
() -> elem,
() -> acc.get()
);
} else {
return acc.get();
}
});
}
Finally, I proceeded to use my very own Stream class:
public static void main(String[] args) {
Stream<Integer> ilist =
Stream
.iterate(1, x -> x + 1)
.filter(x -> x >= 5);
}
But filter isn't lazy! From the output given below, I think filter evaluates the elements until it finds one that matches the given predicate.
FILTER
REDUCE CALL
REDUCE CALL
REDUCE CALL
REDUCE CALL
REDUCE CALL
What's wrong with my code, and how can I make my filter function lazy again?
Update: Based on Sweeper's remarks, I had another go at the filter function without using reduce.
public Stream<T> filter2(Predicate<T> p) {
System.out.println("FILTER2");
T elem = head.get();
if (elem == null) {
return generate(() -> null);
} else {
if (p.test(elem)) {
return new Stream<>(
() -> elem,
() -> this.tail.get().filter2(p)
);
} else {
return this.tail.get().filter2(p);
}
}
}
However, this function isn't lazy as well. The output of my main function using filter2
is as follows:
FILTER2
FILTER2
FILTER2
FILTER2
FILTER2
How can I fix this, and is there a way to implement a lazy filter through a lazy reduce?
Acknowledgements: This exercise and the implementation of the above functions were inspired by the book Functional Programming in Scala by Chiusano and Bjarnason.
In the version that you wrote without reduce
, the case where the element exists but does not satisfy the predicate is not lazy. Instead of wrapping the recursive call in a supplier lambda as you did in the other case, you eagerly get the tail and immediately filter it.
public Stream<T> filter2(Predicate<T> p) {
System.out.println("FILTER2");
T elem = head.get();
if (elem == null) {
return generate(() -> null);
} else {
if (p.test(elem)) {
return new Stream<>(
() -> elem,
() -> this.tail.get().filter2(p)
);
} else {
return this.tail.get().filter2(p); // <- not lazy!
}
}
}
What you need is a way to create a stream such that the decision as to whether it's empty or not is deferred until later.
public class Stream<T> {
// private constructor(s)
public static <T> Stream<T> empty() { /* ... */ }
public static <T> Stream<T> cons(Supplier<T> head, Supplier<Stream<T> tail) { /* ... */ }
public static <T> Stream<T> lazy(Supplier<Stream<T>> stream) { /* ... */ }
public Stream<T> filter(Predicate<T> p) {
if ( /* this stream is empty */ ) {
return Stream.empty();
} else if ( /* head element satisfies predicate */ ) {
// lazily filter tail, cons head element
} else {
return Stream.lazy(() -> this.tail.get().filter(p));
}
}
}
Something along those lines.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With