I have the following problem:
I have a observable that is doing some work, but other observables need the output of that observable to work. I have tried to subscribe several times on the same observable, but inside the log i see that the original observable is initiated multiple times.
thats my observable thats create the object:
Observable.create((Observable.OnSubscribe<Api>) subscriber -> {
if (mApi == null) {
//do some work
}
subscriber.onNext(mApi);
subscriber.unsubscribe();
})
thats my observable that needs the object
loadApi().flatMap(api -> api....()));
I'm using
.subscribeOn(Schedulers.io()) observable.observeOn(AndroidSchedulers.mainThread())
.unsubscribeOn(Schedulers.io()
on all observables.
I'm not sure that I understood your question correctly, but I figure you're looking for a way to share the emissions of an observable between several subscribers. There are several ways of doing this. For one, you could use a Connectable Observable like so:
ConnectableObservable<Integer> obs = Observable.range(1,3).publish();
obs.subscribe(item -> System.out.println("Sub A: " + item));
obs.subscribe(item -> System.out.println("Sub B: " + item));
obs.connect(); //Now the source observable starts emitting items
Output:
Sub A: 1
Sub B: 1
Sub A: 2
Sub B: 2
Sub A: 3
Sub B: 3
Alternatively, you could use a PublishSubject:
PublishSubject<Integer> subject = PublishSubject.create(); //Create a publish subject
subject.subscribe(item -> System.out.println("Sub A: " + item)); //Subscribe both subscribers on the publish subject
subject.subscribe(item -> System.out.println("Sub B: " + item));
Observable.range(1,3).subscribe(subject); //Subscribe the subject on the source observable
Output:
Sub A: 1
Sub B: 1
Sub A: 2
Sub B: 2
Sub A: 3
Sub B: 3
Both of these examples are single threaded, but you can easily add observeOn or subscirbeOn calls to make them async.
First of all using Observable.create is tricky and easy to get wrong. You need something like
Observable.create(subscriber -> {
if (mApi == null) {
//do some work
}
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(mApi);
subscriber.onCompleted();
// Not subscriber.unsubscribe();
}
})
You can use
ConnectableObservable<Integer> obs = Observable.just(1).replay(1).autoConnect();
All subsequent subscribers should get the single emitted item
obs.subscribe(item -> System.out.println("Sub 1 " + item));
obs.subscribe(item -> System.out.println("Sub 2 " + item));
obs.subscribe(item -> System.out.println("Sub 3 " + item));
obs.subscribe(item -> System.out.println("Sub 4 " + item));
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With