I'm trying to test a function where elements of a stream are dispatched one by one after a delay, I was able to get my tests working using Thread.sleep
. However, when I use the TestScheduler.advanceTimeBy
I can't get any result.
Check out the code:
public Observable<Object> getDelayedObjects(Observable<Observable<Object>> objectsStreams) {
objectsStreams.concatMap(objectsStream ->
objectsStream.repeat().concatMap(object ->
Observable.just(object)
.delay(getDuration(object), TimeUnit.MILLISECONDS)));
}
And the test code:
TestScheduler testScheduler = new TestScheduler();
BehaviorSubject<Observable<Object>> objectStreamSubject = BehaviorSubject.create(objectsStream);
model.getDelayedObjects(objectStreamSubject)
.observeOn(testScheduler)
.subscribeOn(testScheduler)
.subscribe(testSubscriber);
testScheduler.triggerActions();
//Thread.sleep(900) works with the default scheduler
testScheduler.advanceTimeBy(900, TimeUnit.MILLISECONDS);
testSubscriber.assertReceivedOnNext(objects);
Checking around for TestScheduler
usage I found out that is usual to pass the scheduler to the delay
function. So I was able to get the tests to pass by providing the scheduler as an argument to the method getDelayedObjects
and then to delay
. However, I still did not get why it was not working before.
The delay
operator, by default, uses the computation scheduler for performing time based delays. This information can be found in the documentation of the method. Look for the value in @SchedulerSupport
annotation, which in this case is io.reactivex:computation
.
For testing purposes you will have to replace the computation Scheduler with a TestScheduler
. To be able to replace, you will have to use one of the many overrides of delay
operator that takes in a Scheduler
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With