I'm trying to test the following RxKotlin/RxJava 2 code:
validate(data)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap { ... }
I'm attempting to override the schedulers as follows:
// Runs before each test suite
RxJavaPlugins.setInitIoSchedulerHandler { Schedulers.trampoline() }
RxAndroidPlugins.setInitMainThreadSchedulerHandler { Schedulers.trampoline() }
However, I get the following error when running the test:
java.lang.ExceptionInInitializerError
...
Caused by: java.lang.NullPointerException: Scheduler Callable result can't be null
at io.reactivex.internal.functions.ObjectHelper.requireNonNull(ObjectHelper.java:39)
at io.reactivex.plugins.RxJavaPlugins.applyRequireNonNull(RxJavaPlugins.java:1317)
at io.reactivex.plugins.RxJavaPlugins.initIoScheduler(RxJavaPlugins.java:306)
at io.reactivex.schedulers.Schedulers.<clinit>(Schedulers.java:84)
Has anyone experienced this problem?
The test worked fine when using RxKotlin/RxJava 1 and the following scheduler overrides:
RxAndroidPlugins.getInstance().registerSchedulersHook(object : RxAndroidSchedulersHook() {
override fun getMainThreadScheduler() = Schedulers.immediate()
})
RxJavaPlugins.getInstance().registerSchedulersHook(object : RxJavaSchedulersHook() {
override fun getIOScheduler() = Schedulers.immediate()
})
I suggest you take a different approach and add a layer of abstraction to your schedulers. This guy has a nice article about it.
It would look something like this in Kotlin
interface SchedulerProvider {
fun ui(): Scheduler
fun computation(): Scheduler
fun trampoline(): Scheduler
fun newThread(): Scheduler
fun io(): Scheduler
}
And then you override that with your own implementation of SchedulerProvider:
class AppSchedulerProvider : SchedulerProvider {
override fun ui(): Scheduler {
return AndroidSchedulers.mainThread()
}
override fun computation(): Scheduler {
return Schedulers.computation()
}
override fun trampoline(): Scheduler {
return Schedulers.trampoline()
}
override fun newThread(): Scheduler {
return Schedulers.newThread()
}
override fun io(): Scheduler {
return Schedulers.io()
}
}
And one for testing classes:
class TestSchedulerProvider : SchedulerProvider {
override fun ui(): Scheduler {
return Schedulers.trampoline()
}
override fun computation(): Scheduler {
return Schedulers.trampoline()
}
override fun trampoline(): Scheduler {
return Schedulers.trampoline()
}
override fun newThread(): Scheduler {
return Schedulers.trampoline()
}
override fun io(): Scheduler {
return Schedulers.trampoline()
}
}
Your code would look like this where you call RxJava:
mCompositeDisposable.add(mDataManager.getQuote()
.subscribeOn(mSchedulerProvider.io())
.observeOn(mSchedulerProvider.ui())
.subscribe(Consumer<Quote> {
...
And you'll just override your implementation of SchedulerProvider
based on where you test it. Here's a sample project for reference, I am linking the test file that would use the testable-version of SchedulerProvider
: https://github.com/Obaied/DingerQuotes/blob/master/app/src/test/java/com/obaied/dingerquotes/QuotePresenterTest.kt#L31
Figured it out! It had to do with the fact that in this code:
validate(data)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap { ... }
validate(data)
was returning an Observable
, which was emitting the following: emitter.onNext(null)
. Since RxJava 2 no longer accepts null
values, flatMap
was not getting called. I changed validate
to return a Completable
and updated the scheduler override to the following:
RxJavaPlugins.setIoSchedulerHandler { Schedulers.trampoline() }
Now the tests pass!
As an alternative to proposed solutions, this has been working fine for a while in my projects. You can use it in your test classes like this:
@get:Rule
val immediateSchedulersRule = ImmediateSchedulersRule()
And the class looks like this:
class ImmediateSchedulersRule : ExternalResource() {
val immediateScheduler: Scheduler = object : Scheduler() {
override fun createWorker() = ExecutorScheduler.ExecutorWorker(Executor { it.run() })
// This prevents errors when scheduling a delay
override fun scheduleDirect(run: Runnable, delay: Long, unit: TimeUnit): Disposable {
return super.scheduleDirect(run, 0, unit)
}
}
override fun before() {
RxJavaPlugins.setIoSchedulerHandler { immediateScheduler }
RxJavaPlugins.setComputationSchedulerHandler { immediateScheduler }
RxJavaPlugins.setNewThreadSchedulerHandler { immediateScheduler }
RxAndroidPlugins.setInitMainThreadSchedulerHandler { immediateScheduler }
RxAndroidPlugins.setMainThreadSchedulerHandler { immediateScheduler }
}
override fun after() {
RxJavaPlugins.reset()
}
}
You can find a way to migrate from TestRule to ExternalResource here and get more info on testing RxJava 2 here.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With