I am trying to understand if there is a way to terminate reduction operation without examining the whole stream and I cannot figure out a way.
The use-case is roughly as follows: let there be a long list of Integer
s which needs to be folded into an Accumulator
. Each element examination is potentially expensive, so within the Accumulator
, I perform a check on the incoming Accumulator
to see if we even need to perform expensive operation - if we don't, then I simply return the accumulator.
This is obviously a fine solution for small(er) lists but huge lists incur unnecessary stream element visiting costs I'd like to avoid.
Here's a code sketch - assume serial reductions only.
class Accumulator {
private final Set<A> setA = new HashSet<>;
private final Set<B> setB = new HashSet<>;
}
class ResultSupplier implements Supplier<Result> {
private final List<Integer> ids;
@Override
public Result get() {
Accumulator acc = ids.stream().reduce(new Accumulator(), f(), (x, y) -> null);
return (acc.setA.size > 1) ? Result.invalid() : Result.valid(acc.setB);
}
private static BiFunction<Accumulator, Integer, Accumulator> f() {
return (acc, element) -> {
if (acc.setA.size() <= 1) {
// perform expensive ops and accumulate results
}
return acc;
};
}
}
In addition to having to traverse the whole Stream
, there is another fact I dislike - I have to check the same condition twice (namely, setA
size check).
I have considered map()
and collect()
operations but they just seemed like more of the same and didn't find they materially change the fact that I just can't finish the fold operation without examining the entire stream.
Additionally, my thinking is that imaginary takeWhile(p : (A) => boolean)
Stream API correspondent would also buy us nothing, as the terminating condition depends on the accumulator, not stream elements per se.
Bear in mind I am a relative newcomer to FP so - is there a way to make this work as I expect it? Have I set up the whole problem improperly or is this limitation by design?
Reducing is the repeated process of combining all elements. reduce operation applies a binary operator to each element in the stream where the first argument to the operator is the return value of the previous application and second argument is the current stream element.
Streams are lazy because intermediate operations are not evaluated unless terminal operation is invoked. Each intermediate operation creates a new stream, stores the provided operation/function and return the new stream. The pipeline accumulates these newly created streams.
You only need to close streams that use IO resources. From the Stream documentation: Streams have a BaseStream. close() method and implement AutoCloseable , but nearly all stream instances do not actually need to be closed after use.
No storage. Streams don't have storage for values; they carry values from a source (which could be a data structure, a generating function, an I/O channel, etc) through a pipeline of computational steps.
Instead of starting with ids.stream()
you can
ids.spliterator()
tryAdvance
return false if the flag is changed StreamSupport.stream(Spliterator<T>, boolean)
Add some static helper methods to keep it functional.
the resulting API could look about this
Accumulator acc = terminateableStream(ids, (stream, terminator) ->
stream.reduce(new Accumulator(terminator), f(), (x, y) -> null));
Additionally, my thinking is that imaginary takeWhile(p : (A) => boolean) Stream API correspondent would also buy us nothing
It does work if the condition is dependent on the accumulator state and not on the stream members. That's essentially the approach i've outlined above.
It probably would be forbidden in a takeWhile
provided by the JDK but a custom implementation using spliterators is free to take a stateful approach.
Of course, there will be an interesting, purely FP answer that might help solve this problem in the way you intend.
In the meantime, why use FP at all when the simple solution is pragmatically imperative and your original data source is a List
anyway, which is already fully materialised, and you will use serial reduction, not parallel reduction. Write this instead:
@Override
public Result get() {
Accumulator acc = new Accumulator();
for (Integer id : ids) {
if (acc.setA.size() <= 1) {
// perform expensive ops and accumulate results
}
// Easy:
if (enough)
break;
}
return (acc.setA.size > 1) ? Result.invalid() : Result.valid(acc.setB);
}
There is no real FP solution, simply because your entire accumulator isn’t FP. We can’t help you in this regard as we don’t know what it is actually doing. All we see is that it relies on two mutable collections and hence, can’t be a part of a pure FP solution.
If you accept the limitations and that there is no clean way using the Stream
API you might strive for the simple way. The simple way incorporates a stateful Predicate
which is not the best thing around but sometimes unavoidable:
public Result get() {
int limit = 1;
Set<A> setA=new HashSet<>();
Set<B> setB=new HashSet<>();
return ids.stream().anyMatch(i -> {
// perform expensive ops and accumulate results
return setA.size() > limit;
})? Result.invalid(): Result.valid(setB);
}
But I want to note that given your specific logic, i.e. that your result is considered invalid when the set grows too large, your attempt of processing not too much elements is an optimization of the erroneous case. You shouldn’t waste effort on optimizing that. If a valid result is the result of processing all elements, then process all elements…
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With