Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

rxjava switch observable if second observable start emits items

Tags:

rx-java

I have some set of observable which I am executing in parallel, like localObservable and networkObservable. If the networkObservable starts emit items (from this time, I need only these items), then discard items emitted by localObservable (maybe localObservable has not yet started).

Observable<Integer> localObservable =
            Observable.defer(() -> Observable.range(1, 10)).subscribeOn(Schedulers.io());
Observable<Integer> networkObservable =
            Observable.defer(() -> Observable.range(11, 20)).subscribeOn(Schedulers.io());
like image 551
xymelon Avatar asked Jan 23 '26 09:01

xymelon


1 Answers

you can do something like this:

 Observable<Long> networkObservable =
            Observable.interval(1000, 500, TimeUnit.MILLISECONDS)
                    .subscribeOn(Schedulers.io())
                    .share();
    Observable<Long> localObservable =
            Observable.interval(500, TimeUnit.MILLISECONDS)                       
                    .subscribeOn(Schedulers.io())
                    .takeUntil(networkObservable);

    Observable.merge(networkObservable, localObservable)
            .subscribe(System.out::println);

this will output:

0 // localObservable 
1 // localObservable 
0 // networkObservable from here on
1
2
...

takeUntil will make localObservable to stop and unsubscribe when the first emission from networkObservable happened, so the merged Observable will emit from localObservable as long networkObservable didn't started, and when it does, it will stop emitting from localObservable and switch to emit only from networkObservable.

like image 58
yosriz Avatar answered Jan 25 '26 06:01

yosriz



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!