I have the following class (simplified):
public class Usecase<T> {
private final Observable<T> get;
private final Scheduler observeScheduler;
public Usecase(Observable<T> get, Scheduler observeScheduler) {
this.get = get;
this.observeScheduler = observeScheduler;
}
public Observable<T> execute() {
return get.subscribeOn(Schedulers.io()).observeOn(observeScheduler);
}
}
And I'm writing unit tests for it. How can I test that subscribeOn
and observeOn
were called with correct values?
I try the following:
Observable<String> observable = mock(Observable.class);
Usecase<String> usecase = new Usecase(observable, Schedulers.computation());
usecase.execute();
verify(observable).subscribeOn(Schedulers.computation()); // should fail here, but passes
verify(observable).observeOn(Schedulers.computation()); // should pass, but fails: Missing method call for verify(mock) here
The above fails (I think) because subscribeOn
and observeOn
are final
methods. So may be there is some other way to ensure that the observable uses correct schedulers?
There is a way to indirectly access both threads on which an observable is operating and being observed, which means you can in fact verify the Observable
uses the correct schedulers.
We're confined to verifying threads by name. Fortunately, threads used by Schedulers.io()
are named with a consistent prefix that we can match against. Here's the (full?) list of schedulers that have unique prefixes for reference:
Schedulers.io()
- RxCachedThreadScheduler
Schedulers.newThread()
- RxNewThreadScheduler
Schedulers.computation()
- RxComputationThreadPool
To verify an Observable was subscribed to on the IO thread:
// Calling Thread.currentThread() inside Observer.OnSubscribe will return the
// thread the Observable is running on (Schedulers.io() in our case)
Observable<String> obs = Observable.create((Subscriber<? super String> s) -> {
s.onNext(Thread.currentThread().getName());
s.onCompleted();
})
// Schedule the Observable
Usecase usecase = new Usecase(obs, Schedulers.immediate());
Observable usecaseObservable = usecase.execute();
// Verify the Observable emitted the name of the IO thread
String subscribingThread = usecaseObservable.toBlocking().first();
assertThat(subscribingThread).startsWith("RxCachedThreadScheduler");
To verify an Observable was observed on the computation thread, you can use TestSubscriber#getLastSeenThread
to access the last thread used for observing.
TestSubscriber<Object> subscriber = TestSubscriber.create();
UseCase usecase = new UseCase(Observable.empty(), Schedulers.computation())
usecase.execute().subscribe(subscriber);
// The observable runs asynchronously, so wait for it to complete
subscriber.awaitTerminalEvent();
subscriber.assertNoErrors();
// Verify the observable was observed on the computation thread
String observingThread = subscriber.getLastSeenThread().getName();
assertThat(observingThread).startsWith("RxComputationThreadPool");
No third-party libraries or mocking are necessary, though I am using AssertJ for the fluent startsWith
assertion.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With