I'm trying to implement a helper method on observables that returns a new observable emitting only the values until a timeout is reached:
implicit class ObservableOps[T](obs: Observable[T]) {
def timedOut(totalSec: Long): Observable[T] = {
require(totalSec >= 0)
val timeOut = Observable.interval(totalSec seconds)
.filter(_ > 0)
.take(1)
obs.takeUntil(timeOut)
}
}
I wrote a test for it, which creates an observable emitting its first value long after the timeout. However, the resulting observable still seems to include the late value:
test("single value too late for timeout") {
val obs = Observable({Thread.sleep(8000); 1})
val list = obs.timedOut(1).toBlockingObservable.toList
assert(list === List())
}
The test fails with the message List(1) did not equal List(). What am I doing wrong?
I suspect that your Thread.sleep(8000) is actually blocking your main thread. Did you try to add a println after val obs in your test to see if it appears right after the test starts?
What's happening here is that your declaration of obs blocks your program for 8 seconds, then you create your new observable using timedOut, such that timedOut see the emitted value as soon as it's called.
Using rx-scala 0.23.0 your timedOut method works (excepted that Observable.interval doesn't emit immediately so the filter(_ > 0) should be removed).
val obs = Observable.just(42).delay(900.millis)
val list = obs.timedOut(1).toBlocking.toList
println(list) // prints List(42)
val obs = Observable.just(42).delay(1100.millis)
val list = obs.timedOut(1).toBlocking.toList
println(list) // prints List()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With