I'm trying to understand RxJava. My test code is:
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;
import java.util.concurrent.TimeUnit;
public class Hello {
public static void main(String[] args) {
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
Thread.sleep(1000);
subscriber.onNext("a");
Thread.sleep(1000);
subscriber.onNext("b");
Thread.sleep(1000);
subscriber.onNext("c");
Thread.sleep(1000);
subscriber.onNext("d");
Thread.sleep(1000);
subscriber.onNext("e");
Thread.sleep(1000);
subscriber.onNext("f");
Thread.sleep(1000);
subscriber.onNext("g");
Thread.sleep(1000);
subscriber.onNext("h");
} catch (InterruptedException e) {
subscriber.onError(e);
}
}
});
observable
.delay(2, TimeUnit.SECONDS)
.subscribe(new Action1<String>() {
@Override
public void call(String string) {
System.out.println(string);
}
});
}
}
Without .delay(2, TimeUnit.SECONDS)
i have the output: a
b
c
d
e
f
g
h
but with .delay(2, TimeUnit.SECONDS)
the output lacks "g" and "h":
a
b
c
d
e
f
How can that be? Documentation says that delay just emits the items emitted by the source Observable shifted forward in time by a specified delay
The delay
overload that you are using schedules work on a different thread and results in an implicit race condition.All temporal operators (such as delay
, buffer
, and window
) need to use a scheduler to schedule the effect for later and this can result in unexpected race conditions if you aren't aware of it and use them carefully. In this case the delay operator schedules the work downstream on a separate thread pool. Here is the order of execution (on the main thread) in your test.
onNext("a")
onNext("b")
to delay. Delay schedules the onNext of "b" for 2 seconds later.onNext("h")
it schedules the work then immediately returns from subscribe and terminates your test (causing the scheduled work to disappear). In order to get it to execute asynchronously you you can schedule the delay on the trampoline scheduler implementation.
.delay(2, TimeUnit.SECONDS, Schedulers.trampoline())
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With