Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to short-circuit a reduce() operation on a Stream?

This is essentially the same question as How to short-circuit reduce on Stream?. However, since that question focuses on a Stream of boolean values, and its answer cannot be generalized for other types and reduce operations, I'd like to ask the more general question.

How can we make a reduce on a stream so that it short-circuits when it encounters an absorbing element for the reducing operation?

The typical mathematical case would be 0 for multiplication. This Stream :

int product = IntStream.of(2, 3, 4, 5, 0, 7, 8)         .reduce(1, (a, b) -> a * b); 

will consume the last two elements (7 and 8) regardless of the fact that once 0 has been encountered the product is known.

like image 230
bowmore Avatar asked Sep 10 '15 06:09

bowmore


People also ask

What is short-circuiting operation on a stream?

A terminal operation is short-circuiting if, when presented with infinite input, it may terminate in finite time. Having a short-circuiting operation in the pipeline is a necessary, but not sufficient, condition for the processing of an infinite stream to terminate normally in finite time.

Which operations are called reduction operations on stream?

The JDK contains many terminal operations (such as average , sum , min , max , and count ) that return one value by combining the contents of a stream. These operations are called reduction operations.

Which of the following is short-circuiting immediate operation in Java stream?

In the javadoc for allMatch(Predicate), anyMatch(Predicate), noneMatch(Predicate), findAny(), and findFirst(): This is a short-circuiting terminal operation. However, note that findFirst and findAny doesn't have a Predicate . So they can both return immediately upon seeing the first/any value.

Which of the following are short circuit operations?

The && and || operators are short circuit operators.


2 Answers

Unfortunately the Stream API has limited capabilities to create your own short-circuit operations. Not so clean solution would be to throw a RuntimeException and catch it. Here's the implementation for IntStream, but it can be generalized for other stream types as well:

public static int reduceWithCancelEx(IntStream stream, int identity,                        IntBinaryOperator combiner, IntPredicate cancelCondition) {     class CancelException extends RuntimeException {         private final int val;          CancelException(int val) {             this.val = val;         }     }      try {         return stream.reduce(identity, (a, b) -> {             int res = combiner.applyAsInt(a, b);             if(cancelCondition.test(res))                 throw new CancelException(res);             return res;         });     } catch (CancelException e) {         return e.val;     } } 

Usage example:

int product = reduceWithCancelEx(         IntStream.of(2, 3, 4, 5, 0, 7, 8).peek(System.out::println),          1, (a, b) -> a * b, val -> val == 0); System.out.println("Result: "+product); 

Output:

2 3 4 5 0 Result: 0 

Note that even though it works with parallel streams, it's not guaranteed that other parallel tasks will be finished as soon as one of them throws an exception. The sub-tasks which are already started will likely to run till finish, so you may process more elements than expected.

Update: alternative solution which is much longer, but more parallel-friendly. It's based on custom spliterator which returns at most one element which is result of accumulation of all underlying elements). When you use it in sequential mode, it does all the work in single tryAdvance call. When you split it, each part generates the correspoding single partial result, which are reduced by Stream engine using the combiner function. Here's generic version, but primitive specialization is possible as well.

final static class CancellableReduceSpliterator<T, A> implements Spliterator<A>,         Consumer<T>, Cloneable {     private Spliterator<T> source;     private final BiFunction<A, ? super T, A> accumulator;     private final Predicate<A> cancelPredicate;     private final AtomicBoolean cancelled = new AtomicBoolean();     private A acc;      CancellableReduceSpliterator(Spliterator<T> source, A identity,             BiFunction<A, ? super T, A> accumulator, Predicate<A> cancelPredicate) {         this.source = source;         this.acc = identity;         this.accumulator = accumulator;         this.cancelPredicate = cancelPredicate;     }      @Override     public boolean tryAdvance(Consumer<? super A> action) {         if (source == null || cancelled.get()) {             source = null;             return false;         }         while (!cancelled.get() && source.tryAdvance(this)) {             if (cancelPredicate.test(acc)) {                 cancelled.set(true);                 break;             }         }         source = null;         action.accept(acc);         return true;     }      @Override     public void forEachRemaining(Consumer<? super A> action) {         tryAdvance(action);     }      @Override     public Spliterator<A> trySplit() {         if(source == null || cancelled.get()) {             source = null;             return null;         }         Spliterator<T> prefix = source.trySplit();         if (prefix == null)             return null;         try {             @SuppressWarnings("unchecked")             CancellableReduceSpliterator<T, A> result =                  (CancellableReduceSpliterator<T, A>) this.clone();             result.source = prefix;             return result;         } catch (CloneNotSupportedException e) {             throw new InternalError();         }     }      @Override     public long estimateSize() {         // let's pretend we have the same number of elements         // as the source, so the pipeline engine parallelize it in the same way         return source == null ? 0 : source.estimateSize();     }      @Override     public int characteristics() {         return source == null ? SIZED : source.characteristics() & ORDERED;     }      @Override     public void accept(T t) {         this.acc = accumulator.apply(this.acc, t);     } } 

Methods which are analogous to Stream.reduce(identity, accumulator, combiner) and Stream.reduce(identity, combiner), but with cancelPredicate:

public static <T, U> U reduceWithCancel(Stream<T> stream, U identity,         BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner,         Predicate<U> cancelPredicate) {     return StreamSupport             .stream(new CancellableReduceSpliterator<>(stream.spliterator(), identity,                     accumulator, cancelPredicate), stream.isParallel()).reduce(combiner)             .orElse(identity); }  public static <T> T reduceWithCancel(Stream<T> stream, T identity,         BinaryOperator<T> combiner, Predicate<T> cancelPredicate) {     return reduceWithCancel(stream, identity, combiner, combiner, cancelPredicate); } 

Let's test both versions and count how many elements are actually processed. Let's put the 0 close to end. Exception version:

AtomicInteger count = new AtomicInteger(); int product = reduceWithCancelEx(         IntStream.range(-1000000, 100).filter(x -> x == 0 || x % 2 != 0)                 .parallel().peek(i -> count.incrementAndGet()), 1,         (a, b) -> a * b, x -> x == 0); System.out.println("product: " + product + "/count: " + count); Thread.sleep(1000); System.out.println("product: " + product + "/count: " + count); 

Typical output:

product: 0/count: 281721 product: 0/count: 500001 

So while result is returned when only some elements are processed, the tasks continue working in background and counter is still increasing. Here's spliterator version:

AtomicInteger count = new AtomicInteger(); int product = reduceWithCancel(         IntStream.range(-1000000, 100).filter(x -> x == 0 || x % 2 != 0)                 .parallel().peek(i -> count.incrementAndGet()).boxed(),                  1, (a, b) -> a * b, x -> x == 0); System.out.println("product: " + product + "/count: " + count); Thread.sleep(1000); System.out.println("product: " + product + "/count: " + count); 

Typical output:

product: 0/count: 281353 product: 0/count: 281353 

All the tasks are actually finished when the result is returned.

like image 121
Tagir Valeev Avatar answered Sep 24 '22 06:09

Tagir Valeev


A general short-circuiting static reduce method can be implemented using the spliterator of a stream. It even turned out to be not very complicated! Using spliterators seems to be the way to go a lot of times when one wants to work with steams in a more flexible way.

public static <T> T reduceWithCancel(Stream<T> s, T acc, BinaryOperator<T> op, Predicate<? super T> cancelPred) {     BoxConsumer<T> box = new BoxConsumer<T>();     Spliterator<T> splitr = s.spliterator();      while (!cancelPred.test(acc) && splitr.tryAdvance(box)) {         acc = op.apply(acc, box.value);     }      return acc; }  public static class BoxConsumer<T> implements Consumer<T> {     T value = null;     public void accept(T t) {         value = t;     } } 

Usage:

    int product = reduceWithCancel(         Stream.of(1, 2, 0, 3, 4).peek(System.out::println),         1, (acc, i) -> acc * i, i -> i == 0);      System.out.println("Result: " + product); 

Output:

1 2 0 Result: 0 

The method could be generalised to perform other kinds of terminal operations.

This is based loosely on this answer about a take-while operation.

I don't know anything about the parallelisation potential of this.

like image 29
Lii Avatar answered Sep 23 '22 06:09

Lii