Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava subscribe and observe on same thread as unit test

I want to write a kind of "blackbox test" for a component, that internally uses RxJava.

Internally it uses Retrofit which returns an Observable to make a httpcall and afterwards it uses .flatmap() to future processing on the data retrieved from retrofit. The idea is to give that component a Transformer for setting the schedulers on the observer like this:

class DefaultTransformer <T> implements Transformer<T, T> {

   public Observable<T> call(Observable<T> observable) { 
      return observable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread());
   }
}

My Component does something like this:

void execute(Transformer<T, T> scheduler){
     Observable<List<Team>> observable = retrofitApi.getLeague(leagueId, seasonId)
        .flatMap(new Func1<LeagueWrapper, Observable<List<Team>>>() {
          @Override public Observable<List<Team>> call(LeagueWrapper wrapper) {                
             return Observable.just(wrapper.getLeague().getTeams());
          }
        });

   observable.compose(transformer);

   observable.subscribe(this);
}

In production I pass DefaultTransformer as parameter, but for Unit tests I want to submit a Transformer that runs on the same thread as the unit test, so everything should run in sync (not async).

I tried that:

class UnitTestTransformer <T> implements Transformer<T, T> {

       public Observable<T> call(Observable<T> observable) { 
          return observable.subscribeOn(Schedulers.test()).observeOn(AndroidSchedulers.test());
       }
    }

But it still runs async in my unit tests. I also tried Scheduler.immediate(). toBlocking() seems not to be an option, because it's not an Observable anymore. Any idea what could be wrong?

like image 351
sockeqwe Avatar asked Oct 20 '22 15:10

sockeqwe


1 Answers

If changing the pattern of how execute() is called is not an option, you may want to try using the RxJava Plugin mechanism.

https://github.com/ReactiveX/RxJava/wiki/Plugins

You can provide:

  • RxJavaSchedulersHook to override the schedulers that are provided during test execution and get them to execute synchronously
  • RxJavaObservableExecutionHook to hook into the Observable execution pipeline and use some kind of synchronization method (like a CountdownLatch) to wait for the Observable subscriptions to finish before continuing
like image 102
Ross Hambrick Avatar answered Oct 22 '22 21:10

Ross Hambrick