Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxAndroid: Create Simple Hot Observable

I'm creating an Observable which emits Integers when subscribed to. My implementation right now is set up so the act of subscribing to it triggers the generation from the start, as follows:

private Observable createObservable() {
    return Observable.create (
        new Observable.OnSubscribe<Integer>() {

            @Override
            public void call(Subscriber<? super Integer> sub) {

                for (int i = 1; i < MAX_PROGRESS + 1; i++) {
                    sub.onNext(i);
                    SystemClock.sleep(1000);
                }
                sub.onCompleted();
            }
        }
    );
}

My understanding is this is a cold Observable. I want the sequence to be generated irrespective of any subscribers, and when a subscriber does subscribe, want them to receive the values which happen to be current at the time of the subsciption. IOW, turn this into a hot Observable. I'd rather not subclass Observable because that ties it into a concrete Integer, whereas in practice the actual type will vary.

like image 205
Carl Whalley Avatar asked Dec 02 '15 22:12

Carl Whalley


1 Answers

Check out rx.subjects.BehaviorSubject<T>. If you aren't familiar with rx.subjects.Subjects the most general way that I can think to describe them is that they break the continuity of subscriptions between point A and B. How is does it is by being both an Observer<T>; Can accept onNext()s from multiple sources (WARNING: external thread safety required). On the other side a subject is also an Observable<T> so multiple Observer<T>s can subscribe and onNext()s coming in will be multicast out to each downstream Observer<T>.

If your code looked like

Observable<T> src = ...;
Subscriber<T> dst;
src.subscribe(dst);

The way to use the BehaviorSubject is

Observable<T> src = ...;
BehaviorSubject<T> subject = BehaviorSubject.create(defaultValue);
src.subscribe(subject);

subscribe immediately to the source and the subject will take is as fast as it is dished out. The BehaviorSubject only keeps the more recent value and drops the defaultValue & all previous values.

// safe to do multiple times.
Subscriber<T> dst;
subject.subscribe(dst);

On subscribe dst receives the most recent value from the src (or the defaultValue) immediately upon subscribing and then all subsequent values until dst unsubscribes.

WARNING: Subjects have a tendency to overused so be sure you need one.

like image 136
George Campbell Avatar answered Oct 06 '22 15:10

George Campbell