Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java 8: stop reduction operation from examining all Stream elements

I am trying to understand if there is a way to terminate reduction operation without examining the whole stream and I cannot figure out a way.

The use-case is roughly as follows: let there be a long list of Integers which needs to be folded into an Accumulator. Each element examination is potentially expensive, so within the Accumulator, I perform a check on the incoming Accumulator to see if we even need to perform expensive operation - if we don't, then I simply return the accumulator.

This is obviously a fine solution for small(er) lists but huge lists incur unnecessary stream element visiting costs I'd like to avoid.

Here's a code sketch - assume serial reductions only.

class Accumulator {
    private final Set<A> setA = new HashSet<>;
    private final Set<B> setB = new HashSet<>;
}

class ResultSupplier implements Supplier<Result> {

    private final List<Integer> ids;

    @Override
    public Result get() {
        Accumulator acc = ids.stream().reduce(new Accumulator(), f(), (x, y) -> null);

        return (acc.setA.size > 1) ? Result.invalid() : Result.valid(acc.setB);
    }

    private static BiFunction<Accumulator, Integer, Accumulator> f() {
        return (acc, element) -> {
            if (acc.setA.size() <= 1) {
                // perform expensive ops and accumulate results
            }
            return acc;
        };
    }
}

In addition to having to traverse the whole Stream, there is another fact I dislike - I have to check the same condition twice (namely, setA size check).

I have considered map() and collect() operations but they just seemed like more of the same and didn't find they materially change the fact that I just can't finish the fold operation without examining the entire stream.

Additionally, my thinking is that imaginary takeWhile(p : (A) => boolean) Stream API correspondent would also buy us nothing, as the terminating condition depends on the accumulator, not stream elements per se.

Bear in mind I am a relative newcomer to FP so - is there a way to make this work as I expect it? Have I set up the whole problem improperly or is this limitation by design?

like image 530
quantum Avatar asked Jun 09 '15 19:06

quantum


People also ask

What does reduce () method does in stream?

Reducing is the repeated process of combining all elements. reduce operation applies a binary operator to each element in the stream where the first argument to the operator is the return value of the previous application and second argument is the current stream element.

How streams are lazy in Java 8?

Streams are lazy because intermediate operations are not evaluated unless terminal operation is invoked. Each intermediate operation creates a new stream, stores the provided operation/function and return the new stream. The pipeline accumulates these newly created streams.

How do I close a stream in Java 8?

You only need to close streams that use IO resources. From the Stream documentation: Streams have a BaseStream. close() method and implement AutoCloseable , but nearly all stream instances do not actually need to be closed after use.

Does Java 8 streams have limited storage?

No storage. Streams don't have storage for values; they carry values from a source (which could be a data structure, a generating function, an I/O channel, etc) through a pipeline of computational steps.


3 Answers

Instead of starting with ids.stream() you can

  1. use ids.spliterator()
  2. wrap resulting spliterator into custom spliterator that has a volatile boolean flag
  3. have the custom spliterator's tryAdvance return false if the flag is changed
  4. turn your custom spliterator into a stream with StreamSupport.stream(Spliterator<T>, boolean)
  5. continue your stream pipeline as before
  6. shut down the stream by toggling the boolean when your accumulator is full

Add some static helper methods to keep it functional.

the resulting API could look about this

Accumulator acc = terminateableStream(ids, (stream, terminator) ->
   stream.reduce(new Accumulator(terminator), f(), (x, y) -> null));

Additionally, my thinking is that imaginary takeWhile(p : (A) => boolean) Stream API correspondent would also buy us nothing

It does work if the condition is dependent on the accumulator state and not on the stream members. That's essentially the approach i've outlined above.

It probably would be forbidden in a takeWhile provided by the JDK but a custom implementation using spliterators is free to take a stateful approach.

like image 176
the8472 Avatar answered Oct 03 '22 23:10

the8472


Of course, there will be an interesting, purely FP answer that might help solve this problem in the way you intend.

In the meantime, why use FP at all when the simple solution is pragmatically imperative and your original data source is a List anyway, which is already fully materialised, and you will use serial reduction, not parallel reduction. Write this instead:

@Override
public Result get() {
    Accumulator acc = new Accumulator();

    for (Integer id : ids) {
        if (acc.setA.size() <= 1) {
            // perform expensive ops and accumulate results
        }

        // Easy:
        if (enough)
            break;
    }

    return (acc.setA.size > 1) ? Result.invalid() : Result.valid(acc.setB);
}
like image 30
Lukas Eder Avatar answered Oct 04 '22 00:10

Lukas Eder


There is no real FP solution, simply because your entire accumulator isn’t FP. We can’t help you in this regard as we don’t know what it is actually doing. All we see is that it relies on two mutable collections and hence, can’t be a part of a pure FP solution.

If you accept the limitations and that there is no clean way using the Stream API you might strive for the simple way. The simple way incorporates a stateful Predicate which is not the best thing around but sometimes unavoidable:

public Result get() {
    int limit = 1;
    Set<A> setA=new HashSet<>();
    Set<B> setB=new HashSet<>();
    return ids.stream().anyMatch(i -> {
        // perform expensive ops and accumulate results
        return setA.size() > limit;
    })? Result.invalid(): Result.valid(setB);
}

But I want to note that given your specific logic, i.e. that your result is considered invalid when the set grows too large, your attempt of processing not too much elements is an optimization of the erroneous case. You shouldn’t waste effort on optimizing that. If a valid result is the result of processing all elements, then process all elements…

like image 36
Holger Avatar answered Oct 03 '22 22:10

Holger