Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rx - Divide stream into segments (lists) by condition

I have an RX producer that creates a stream of strings like so (simplified version of the real stream):

A1 A2 A3 B1 B2 C1 C2 C3 C4 C5 C6....

The stream is endless, but ordered. So after the strings that start with A run out, B starts. WhenB runs out, C starts... whenZ run out, we move to AA1 etc. There's an unknown number of A's, B's etc, but it's typically 10-30 instances per letter.

I'm looking for a way to divide this stream into blocks of all A's: A1 A2 A3, all B's: B1 B2, all C's: C1 C2 C3 C4 C5 C6 etc. Each block can be either an observable (which I'll turn into a list) or simply a list.

I tried several different approaches using RxJava, all of which failed. Among the things that didn't work are:

  • Group by: since the stream is endless, the per-letter observable doesn't complete. So when the A's run out and the B's start, A's Observable doesn't complete. So there's an ever increasing number of observables.

  • Window/Buffer with distinctUntilChanged - I use "distinctUntilChanged" on the original stream to output the first item of each group (the first A, first B etc). Then I use that stream as an input to window or a "buffer" operator to be used as a boundary between windows/buffers. That didn't work and all I got was empty lists.

What's a correct solution using RX? I'd prefer a Java solution, but solutions in other implementations of RX that can be easily converted to Java are also very welcome.

like image 566
Malt Avatar asked Sep 17 '15 07:09

Malt


3 Answers

You can use rxjava-extras .toListWhile:

Observable<String> source = 
    Observable.just("A1", "A2", "A3", "B1", "B2", "B3", "C1", "D1");
source.compose(Transformers.<String> toListWhile(
            (list, t) -> list.isEmpty() 
                         || list.get(0).charAt(0) == t.charAt(0)))
      .forEach(System.out::println);

It does what @akarnokd has done under the covers and is unit tested.

like image 137
Dave Moten Avatar answered Oct 26 '22 08:10

Dave Moten


Here is a way I'd solve this:

Observable<String> source = Observable.from(
        "A1", "A2", "A3", "B1", "B2", "B3", "C1", "D1");

Observable<List<String>> output = Observable.defer(() -> {
    List<String> buffer = new ArrayList<>();
    return 
            Observable.concat(
                source.concatMap(new Function<String, Observable<List<String>>>() {
                    String lastKey;
                    @Override
                    public Observable<List<String>> apply(String t) {
                        String key = t.substring(0, 1);
                        if (lastKey != null && !key.equals(lastKey)) {
                            List<String> b = new ArrayList<>(buffer);
                            buffer.clear();
                            buffer.add(t);
                            lastKey = key;
                            return Observable.just(b);
                        }
                        lastKey = key;
                        buffer.add(t);
                        return Observable.empty();
                    }
                }),
                Observable.just(1)
                .flatMap(v -> {
                    if (buffer.isEmpty()) {
                        return Observable.empty();
                    }
                    return Observable.just(buffer);
                })
            );
    }
);

output.subscribe(System.out::println);

This is how it works:

  • I'm using defer because we need a per-subscriber buffer, not a global one
  • I concatenate the buffering with the emission of the last buffer if the source happens to be finite
  • I use concatMap and add to a buffer until the key changes, until then, I emit empty Observables. Once the key changed, I emit the contents of the buffer and start a new one.
like image 4
akarnokd Avatar answered Oct 26 '22 06:10

akarnokd


After looking at akarnokd's and Dave's answers, I came up with my own solution by implementing a custom Rx Operator called BufferWhile. It seems to work just as well as the other solutions (someone please correct me if I'm wrong), but it seems more straight forward:

public class RxBufferWhileOperator<T, U> implements Operator<List<T>, T>{

    private final Func1<? super T, ? extends U> keyGenerator;

    public RxBufferWhileOperator(Func1<? super T, ? extends U> keyGenerator) {
        this.keyGenerator = keyGenerator;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super List<T>> s) {
        return new Subscriber<T>(s) {

            private ArrayList<T> buffer = null;
            private U currentKey = null;

            @Override
            public void onCompleted() {
                submitAndClearBuffer();
                s.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                submitAndClearBuffer(); //Optional, remove if submitting partial buffers doesn't make sense in your case
                s.onError(e);
            }

            @Override
            public void onNext(T t) {
                if (currentKey == null || !currentKey.equals(keyGenerator.call(t))) {
                    currentKey = keyGenerator.call(t);
                    submitAndClearBuffer();
                    buffer.add(t);
                } else {
                    buffer.add(t);
                    request(1); // Request additional T since we "swallowed" the incoming result without calling subsequent subscribers
                }
            }

            private void submitAndClearBuffer() {
                if (buffer != null && buffer.size() > 0) {
                    s.onNext(buffer);
                }
                buffer = new ArrayList<>();
            }
        };
    }
}

I can apply this operator on the original observable using lift, and get an observable that emits lists of string.

like image 1
Malt Avatar answered Oct 26 '22 08:10

Malt