Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava share an Observable's emissions between multiple subscribers

I have the following problem:

I have a observable that is doing some work, but other observables need the output of that observable to work. I have tried to subscribe several times on the same observable, but inside the log i see that the original observable is initiated multiple times.

thats my observable thats create the object:

Observable.create((Observable.OnSubscribe<Api>) subscriber -> {
            if (mApi == null) {
                //do some work
            }
            subscriber.onNext(mApi);
            subscriber.unsubscribe();
        })

thats my observable that needs the object

loadApi().flatMap(api -> api....()));

I'm using

.subscribeOn(Schedulers.io()) observable.observeOn(AndroidSchedulers.mainThread())
                .unsubscribeOn(Schedulers.io()

on all observables.

like image 718
H3x0n Avatar asked Apr 23 '16 00:04

H3x0n


2 Answers

I'm not sure that I understood your question correctly, but I figure you're looking for a way to share the emissions of an observable between several subscribers. There are several ways of doing this. For one, you could use a Connectable Observable like so:

ConnectableObservable<Integer> obs = Observable.range(1,3).publish();
obs.subscribe(item -> System.out.println("Sub A: " + item));
obs.subscribe(item -> System.out.println("Sub B: " + item));
obs.connect(); //Now the source observable starts emitting items

Output:

Sub A: 1
Sub B: 1
Sub A: 2
Sub B: 2
Sub A: 3
Sub B: 3

Alternatively, you could use a PublishSubject:

PublishSubject<Integer> subject = PublishSubject.create(); //Create a publish subject
subject.subscribe(item -> System.out.println("Sub A: " + item)); //Subscribe both subscribers on the publish subject
subject.subscribe(item -> System.out.println("Sub B: " + item));    
Observable.range(1,3).subscribe(subject); //Subscribe the subject on the source observable

Output:

Sub A: 1
Sub B: 1
Sub A: 2
Sub B: 2
Sub A: 3
Sub B: 3

Both of these examples are single threaded, but you can easily add observeOn or subscirbeOn calls to make them async.

like image 185
Malt Avatar answered Oct 12 '22 10:10

Malt


First of all using Observable.create is tricky and easy to get wrong. You need something like

Observable.create(subscriber -> {
        if (mApi == null) {
            //do some work
        }
        if (!subscriber.isUnsubscribed()) {
           subscriber.onNext(mApi);
           subscriber.onCompleted();
           // Not subscriber.unsubscribe(); 
        }            
 })

You can use

ConnectableObservable<Integer> obs = Observable.just(1).replay(1).autoConnect();

All subsequent subscribers should get the single emitted item

obs.subscribe(item -> System.out.println("Sub 1 " + item));
obs.subscribe(item -> System.out.println("Sub 2 " + item));
obs.subscribe(item -> System.out.println("Sub 3 " + item));
obs.subscribe(item -> System.out.println("Sub 4 " + item));
like image 32
JohnWowUs Avatar answered Oct 12 '22 10:10

JohnWowUs