I have an RX producer that creates a stream of strings like so (simplified version of the real stream):
A1 A2 A3 B1 B2 C1 C2 C3 C4 C5 C6....
The stream is endless, but ordered. So after the strings that start with A
run out, B
starts. WhenB
runs out, C
starts... whenZ
run out, we move to AA1
etc. There's an unknown number of A
's, B
's etc, but it's typically 10-30 instances per letter.
I'm looking for a way to divide this stream into blocks of all A's: A1 A2 A3
, all B's: B1 B2
, all C's: C1 C2 C3 C4 C5 C6
etc. Each block can be either an observable (which I'll turn into a list) or simply a list.
I tried several different approaches using RxJava, all of which failed. Among the things that didn't work are:
Group by: since the stream is endless, the per-letter observable doesn't complete. So when the A's run out and the B's start, A's Observable doesn't complete. So there's an ever increasing number of observables.
Window/Buffer with distinctUntilChanged - I use "distinctUntilChanged" on the original stream to output the first item of each group (the first A, first B etc). Then I use that stream as an input to window
or a "buffer" operator to be used as a boundary between windows/buffers. That didn't work and all I got was empty lists.
What's a correct solution using RX? I'd prefer a Java solution, but solutions in other implementations of RX that can be easily converted to Java are also very welcome.
You can use rxjava-extras .toListWhile
:
Observable<String> source =
Observable.just("A1", "A2", "A3", "B1", "B2", "B3", "C1", "D1");
source.compose(Transformers.<String> toListWhile(
(list, t) -> list.isEmpty()
|| list.get(0).charAt(0) == t.charAt(0)))
.forEach(System.out::println);
It does what @akarnokd has done under the covers and is unit tested.
Here is a way I'd solve this:
Observable<String> source = Observable.from(
"A1", "A2", "A3", "B1", "B2", "B3", "C1", "D1");
Observable<List<String>> output = Observable.defer(() -> {
List<String> buffer = new ArrayList<>();
return
Observable.concat(
source.concatMap(new Function<String, Observable<List<String>>>() {
String lastKey;
@Override
public Observable<List<String>> apply(String t) {
String key = t.substring(0, 1);
if (lastKey != null && !key.equals(lastKey)) {
List<String> b = new ArrayList<>(buffer);
buffer.clear();
buffer.add(t);
lastKey = key;
return Observable.just(b);
}
lastKey = key;
buffer.add(t);
return Observable.empty();
}
}),
Observable.just(1)
.flatMap(v -> {
if (buffer.isEmpty()) {
return Observable.empty();
}
return Observable.just(buffer);
})
);
}
);
output.subscribe(System.out::println);
This is how it works:
After looking at akarnokd's and Dave's answers, I came up with my own solution by implementing a custom Rx Operator called BufferWhile
. It seems to work just as well as the other solutions (someone please correct me if I'm wrong), but it seems more straight forward:
public class RxBufferWhileOperator<T, U> implements Operator<List<T>, T>{
private final Func1<? super T, ? extends U> keyGenerator;
public RxBufferWhileOperator(Func1<? super T, ? extends U> keyGenerator) {
this.keyGenerator = keyGenerator;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super List<T>> s) {
return new Subscriber<T>(s) {
private ArrayList<T> buffer = null;
private U currentKey = null;
@Override
public void onCompleted() {
submitAndClearBuffer();
s.onCompleted();
}
@Override
public void onError(Throwable e) {
submitAndClearBuffer(); //Optional, remove if submitting partial buffers doesn't make sense in your case
s.onError(e);
}
@Override
public void onNext(T t) {
if (currentKey == null || !currentKey.equals(keyGenerator.call(t))) {
currentKey = keyGenerator.call(t);
submitAndClearBuffer();
buffer.add(t);
} else {
buffer.add(t);
request(1); // Request additional T since we "swallowed" the incoming result without calling subsequent subscribers
}
}
private void submitAndClearBuffer() {
if (buffer != null && buffer.size() > 0) {
s.onNext(buffer);
}
buffer = new ArrayList<>();
}
};
}
}
I can apply this operator on the original observable using lift
, and get an observable that emits lists of string.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With