Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RXJava - running multiple observables after another (like concat, but with onCompleted for each observable)

Is there some way to achieve following:

I have 3 observables of the type Observable<MyData>. What I want is following:

  • run the first observable
  • observe the onCompleted of the first observable
  • run the second observable
  • observe the onCompleted of the second observable
  • run the third observable
  • observe the onCompleted of the third observable

This can be done with concat but then I only will be able to observe the last onCompleted.

Ugly solution

I know, I can achieve that, if I just start the next obersvable from the onCompleted event of the former one.

Question

Is there any other way to achieve this with even with an arbitrary number of observables? I want to avoid chaining this all together from the onCompleted event, as this looks really ugly and the deeper the chaining goes the less clear it gets...

Edit - UseCase

  • First, I want to emit the data that represents the data my app had loaded the last time it was started (I will serialise the data to disc) => the reason is, I want a very fast app start all the time
  • Then I want a second run of data loading, to load up-to-date data in an empty state
  • then I want a third run of data loading to load the missing deeper data of the up-to-date data

I want to constantly update the UI and I want to know, when each level of data loading has finished

like image 430
prom85 Avatar asked Aug 16 '16 12:08

prom85


2 Answers

I hope below code helps.

Observable<MyData> observable1 = ...;
Observable<MyData> observable2 = ...;
Observable<MyData> observable3 = ...;

Observable
        .concat(observable1.doOnCompleted(this::onCompleteObservable1),
                observable2.doOnCompleted(this::onCompleteObservable2),
                observable3.doOnCompleted(this::onCompleteObservable3))
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe( ... );

Here is sample methods.

void onCompleteObservable1() {
    //do some work
}

void onCompleteObservable2() {
    //do some work
}

void onCompleteObservable3() {
    //do some work
}
like image 122
nshmura Avatar answered Nov 13 '22 03:11

nshmura


I think concatMap could be the answer. With concatMap you can concatenate observables and subscribe once, so you're code could be something like:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    .concatMap(integer -> Observable.just(integer)
            .observeOn(Schedulers.computation())
            .concatMap(i -> {
                try {
                    Thread.sleep(new Random().nextInt(1000));
                    return Observable.just(2 * i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    return Observable.error(e);
                }
            }))
    .subscribe(System.out::println,
            Throwable::printStackTrace,
            () -> System.out.println("onCompleted"));
like image 25
Marco C. Avatar answered Nov 13 '22 04:11

Marco C.