Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava: buffer items until some condition is true for current item

Tags:

java

rx-java

Here's a snippet I'm trying to figure out:

class RaceCondition {

    Subject<Integer, Integer> subject = PublishSubject.create();

    public void entryPoint(Integer data) {
        subject.onNext(data);
    }

    public void client() {
        subject /*some operations*/
                .buffer(getClosingSelector())
                .subscribe(/*handle results*/);
    }

    private Observable<Integer> getClosingSelector() {
        return subject /* some filtering */;
    }
}

There's a Subject that accepts events from the outside. There's a client subscribed to this subject that processes the events and also buffers them. The main idea here is that the buffered items should be emitted each time based on some condition calculated using an item from the stream.

For this purpose the buffer boundary itself listens to the subject.

An important desired behaviour: whenever the boundary emits the item, it should also be included in the following emission of the buffer. It's not the case with the current configuration as the item (at least that's what I think) is emitted from the closing selector before it reaches the buffer, so that it's not included in the current emission but is left behind waiting for the next one.

Is there a way to essentially make closing selector wait for the item to be buffered first? If not, is there another way to buffer and release items based on the next incoming item?

like image 948
AndroidEx Avatar asked Mar 13 '23 00:03

AndroidEx


1 Answers

If I understand correctly, you want to buffer until some predicate allows it based on the items. You can do this with a complicated set of operators but perhaps its easier to just write a custom operator:

public final class BufferUntil<T> 
implements Operator<List<T>, T>{

    final Func1<T, Boolean> boundaryPredicate;

    public BufferUntil(Func1<T, Boolean> boundaryPredicate) {
        this.boundaryPredicate = boundaryPredicate;
    }

    @Override
    public Subscriber<? super T> call(
            Subscriber<? super List<T>> child) {
        BufferWhileSubscriber parent = 
                new BufferWhileSubscriber(child);
        child.add(parent);
        return parent;
    }

    final class BufferWhileSubscriber extends Subscriber<T> {
        final Subscriber<? super List<T>> actual;

        List<T> buffer = new ArrayList<>();

        /**
         * @param actual
         */
        public BufferWhileSubscriber(
                Subscriber<? super List<T>> actual) {
            this.actual = actual;
        }

        @Override
        public void onNext(T t) {
            buffer.add(t);
            if (boundaryPredicate.call(t)) {
                actual.onNext(buffer);
                buffer = new ArrayList<>();
            }
        }

        @Override
        public void onError(Throwable e) {
            buffer = null;
            actual.onError(e);
        }

        @Override
        public void onCompleted() {
            List<T> b = buffer;
            buffer = null;
            if (!b.isEmpty()) {
                actual.onNext(b);
            }
            actual.onCompleted();
        }
    }
}
like image 108
akarnokd Avatar answered Mar 16 '23 00:03

akarnokd