I have some set of observable which I am executing in parallel, like localObservable and networkObservable. If the networkObservable starts emit items (from this time, I need only these items), then discard items emitted by localObservable (maybe localObservable has not yet started).
Observable<Integer> localObservable =
Observable.defer(() -> Observable.range(1, 10)).subscribeOn(Schedulers.io());
Observable<Integer> networkObservable =
Observable.defer(() -> Observable.range(11, 20)).subscribeOn(Schedulers.io());
you can do something like this:
Observable<Long> networkObservable =
Observable.interval(1000, 500, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.share();
Observable<Long> localObservable =
Observable.interval(500, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.takeUntil(networkObservable);
Observable.merge(networkObservable, localObservable)
.subscribe(System.out::println);
this will output:
0 // localObservable
1 // localObservable
0 // networkObservable from here on
1
2
...
takeUntil will make localObservable to stop and unsubscribe when the first emission from networkObservable happened, so the merged Observable will emit from localObservable as long networkObservable didn't started, and when it does, it will stop emitting from localObservable and switch to emit only from networkObservable.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With