Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to test that observable uses correct schedulers in RxJava?

I have the following class (simplified):

public class Usecase<T> {
    private final Observable<T> get;
    private final Scheduler observeScheduler;

    public Usecase(Observable<T> get, Scheduler observeScheduler) {
        this.get = get;
        this.observeScheduler = observeScheduler;
    }

    public Observable<T> execute() {
        return get.subscribeOn(Schedulers.io()).observeOn(observeScheduler);
    }
}

And I'm writing unit tests for it. How can I test that subscribeOn and observeOn were called with correct values?

I try the following:

    Observable<String> observable = mock(Observable.class);
    Usecase<String> usecase = new Usecase(observable, Schedulers.computation());
    usecase.execute();

    verify(observable).subscribeOn(Schedulers.computation()); // should fail here, but passes
    verify(observable).observeOn(Schedulers.computation());   // should pass, but fails: Missing method call for verify(mock) here

The above fails (I think) because subscribeOn and observeOn are final methods. So may be there is some other way to ensure that the observable uses correct schedulers?

like image 861
netimen Avatar asked Apr 08 '16 13:04

netimen


1 Answers

There is a way to indirectly access both threads on which an observable is operating and being observed, which means you can in fact verify the Observable uses the correct schedulers.

We're confined to verifying threads by name. Fortunately, threads used by Schedulers.io() are named with a consistent prefix that we can match against. Here's the (full?) list of schedulers that have unique prefixes for reference:

  • Schedulers.io() - RxCachedThreadScheduler
  • Schedulers.newThread() - RxNewThreadScheduler
  • Schedulers.computation() - RxComputationThreadPool

To verify an Observable was subscribed to on the IO thread:

// Calling Thread.currentThread() inside Observer.OnSubscribe will return the
// thread the Observable is running on (Schedulers.io() in our case)
Observable<String> obs = Observable.create((Subscriber<? super String> s) -> {
    s.onNext(Thread.currentThread().getName());
    s.onCompleted();
})

// Schedule the Observable
Usecase usecase = new Usecase(obs, Schedulers.immediate());
Observable usecaseObservable = usecase.execute();

// Verify the Observable emitted the name of the IO thread
String subscribingThread = usecaseObservable.toBlocking().first();
assertThat(subscribingThread).startsWith("RxCachedThreadScheduler");

To verify an Observable was observed on the computation thread, you can use TestSubscriber#getLastSeenThread to access the last thread used for observing.

TestSubscriber<Object> subscriber = TestSubscriber.create();
UseCase usecase = new UseCase(Observable.empty(), Schedulers.computation())
usecase.execute().subscribe(subscriber);

// The observable runs asynchronously, so wait for it to complete
subscriber.awaitTerminalEvent();
subscriber.assertNoErrors();

// Verify the observable was observed on the computation thread
String observingThread = subscriber.getLastSeenThread().getName();
assertThat(observingThread).startsWith("RxComputationThreadPool");

No third-party libraries or mocking are necessary, though I am using AssertJ for the fluent startsWith assertion.

like image 137
Roth Avatar answered Oct 09 '22 13:10

Roth