Here's a snippet I'm trying to figure out:
class RaceCondition {
Subject<Integer, Integer> subject = PublishSubject.create();
public void entryPoint(Integer data) {
subject.onNext(data);
}
public void client() {
subject /*some operations*/
.buffer(getClosingSelector())
.subscribe(/*handle results*/);
}
private Observable<Integer> getClosingSelector() {
return subject /* some filtering */;
}
}
There's a Subject
that accepts events from the outside. There's a client subscribed to this subject that processes the events and also buffer
s them. The main idea here is that the buffered items should be emitted each time based on some condition calculated using an item from the stream.
For this purpose the buffer boundary itself listens to the subject.
An important desired behaviour: whenever the boundary emits the item, it should also be included in the following emission of the buffer
. It's not the case with the current configuration as the item (at least that's what I think) is emitted from the closing selector before it reaches the buffer
, so that it's not included in the current emission but is left behind waiting for the next one.
Is there a way to essentially make closing selector wait for the item to be buffered first? If not, is there another way to buffer and release items based on the next incoming item?
If I understand correctly, you want to buffer until some predicate allows it based on the items. You can do this with a complicated set of operators but perhaps its easier to just write a custom operator:
public final class BufferUntil<T>
implements Operator<List<T>, T>{
final Func1<T, Boolean> boundaryPredicate;
public BufferUntil(Func1<T, Boolean> boundaryPredicate) {
this.boundaryPredicate = boundaryPredicate;
}
@Override
public Subscriber<? super T> call(
Subscriber<? super List<T>> child) {
BufferWhileSubscriber parent =
new BufferWhileSubscriber(child);
child.add(parent);
return parent;
}
final class BufferWhileSubscriber extends Subscriber<T> {
final Subscriber<? super List<T>> actual;
List<T> buffer = new ArrayList<>();
/**
* @param actual
*/
public BufferWhileSubscriber(
Subscriber<? super List<T>> actual) {
this.actual = actual;
}
@Override
public void onNext(T t) {
buffer.add(t);
if (boundaryPredicate.call(t)) {
actual.onNext(buffer);
buffer = new ArrayList<>();
}
}
@Override
public void onError(Throwable e) {
buffer = null;
actual.onError(e);
}
@Override
public void onCompleted() {
List<T> b = buffer;
buffer = null;
if (!b.isEmpty()) {
actual.onNext(b);
}
actual.onCompleted();
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With