I'm looking for a Subject (or something similar) that can:
BehaviorSubject
almost would do the job, but it retains last observed item.
UPDATE
Based on accepted answer I worked out similar solution for single observed item. Also added unsubscription part to avoid memory leaks.
class LastEventObservable private constructor(
private val onSubscribe: OnSubscribe<Any>,
private val state: State
) : Observable<Any>(onSubscribe) {
fun emit(value: Any) {
if (state.subscriber.hasObservers()) {
state.subscriber.onNext(value)
} else {
state.lastItem = value
}
}
companion object {
fun create(): LastEventObservable {
val state = State()
val onSubscribe = OnSubscribe<Any> { subscriber ->
just(state.lastItem)
.filter { it != null }
.doOnNext { subscriber.onNext(it) }
.doOnCompleted { state.lastItem = null }
.subscribe()
val subscription = state.subscriber.subscribe(subscriber)
subscriber.add(Subscriptions.create { subscription.unsubscribe() })
}
return LastEventObservable(onSubscribe, state)
}
}
private class State {
var lastItem: Any? = null
val subscriber = PublishSubject.create<Any>()
}
}
I achieve the expected result creating a customized Observable that wraps a publish subject and handles emission cache if there's no subscribers attached. Check it out.
public class ExampleUnitTest {
@Test
public void testSample() throws Exception {
MyCustomObservable myCustomObservable = new MyCustomObservable();
myCustomObservable.emit("1");
myCustomObservable.emit("2");
myCustomObservable.emit("3");
Subscription subscription = myCustomObservable.subscribe(System.out::println);
myCustomObservable.emit("4");
myCustomObservable.emit("5");
subscription.unsubscribe();
myCustomObservable.emit("6");
myCustomObservable.emit("7");
myCustomObservable.emit("8");
myCustomObservable.subscribe(System.out::println);
}
}
class MyCustomObservable extends Observable<String> {
private static PublishSubject<String> publishSubject = PublishSubject.create();
private static List<String> valuesCache = new ArrayList<>();
protected MyCustomObservable() {
super(subscriber -> {
Observable.from(valuesCache)
.doOnNext(subscriber::onNext)
.doOnCompleted(valuesCache::clear)
.subscribe();
publishSubject.subscribe(subscriber);
});
}
public void emit(String value) {
if (publishSubject.hasObservers()) {
publishSubject.onNext(value);
} else {
valuesCache.add(value);
}
}
}
Hope that it helps!
Best Regards.
If you only want to wait for a single subscriber, use UnicastSubject
but note that if you unsubscribe in the middle, all subsequent queued items will be lost.
Edit:
Once we have a subscriber all items are consumed and never emitted again
For multiple subscribers, use ReplaySubject
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With