Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Observable takeUntil misbehaving

Tags:

scala

rx-java

I'm trying to implement a helper method on observables that returns a new observable emitting only the values until a timeout is reached:

implicit class ObservableOps[T](obs: Observable[T]) {

  def timedOut(totalSec: Long): Observable[T] = {
    require(totalSec >= 0)
    val timeOut = Observable.interval(totalSec seconds)
      .filter(_ > 0)
      .take(1)
    obs.takeUntil(timeOut)
  }
}

I wrote a test for it, which creates an observable emitting its first value long after the timeout. However, the resulting observable still seems to include the late value:

test("single value too late for timeout") {
  val obs = Observable({Thread.sleep(8000); 1})
  val list = obs.timedOut(1).toBlockingObservable.toList
  assert(list === List())
}

The test fails with the message List(1) did not equal List(). What am I doing wrong?

like image 998
Zoltán Avatar asked May 13 '26 10:05

Zoltán


1 Answers

I suspect that your Thread.sleep(8000) is actually blocking your main thread. Did you try to add a println after val obs in your test to see if it appears right after the test starts?

What's happening here is that your declaration of obs blocks your program for 8 seconds, then you create your new observable using timedOut, such that timedOut see the emitted value as soon as it's called.

Using rx-scala 0.23.0 your timedOut method works (excepted that Observable.interval doesn't emit immediately so the filter(_ > 0) should be removed).

val obs = Observable.just(42).delay(900.millis)
val list = obs.timedOut(1).toBlocking.toList
println(list) // prints List(42)

val obs = Observable.just(42).delay(1100.millis)
val list = obs.timedOut(1).toBlocking.toList
println(list) // prints List()
like image 90
Dimitri Avatar answered May 16 '26 06:05

Dimitri



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!