Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava. Observable.delay work strange (lacks some items at the end)

Tags:

java

rx-java

I'm trying to understand RxJava. My test code is:

import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;

import java.util.concurrent.TimeUnit;

public class Hello {
    public static void main(String[] args) {

            Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                try {
                    Thread.sleep(1000);
                    subscriber.onNext("a");
                    Thread.sleep(1000);
                    subscriber.onNext("b");
                    Thread.sleep(1000);
                    subscriber.onNext("c");
                    Thread.sleep(1000);
                    subscriber.onNext("d");
                    Thread.sleep(1000);
                    subscriber.onNext("e");
                    Thread.sleep(1000);
                    subscriber.onNext("f");
                    Thread.sleep(1000);
                    subscriber.onNext("g");
                    Thread.sleep(1000);
                    subscriber.onNext("h");
                } catch (InterruptedException e) {
                    subscriber.onError(e);
                }
            }
        });

        observable
                .delay(2, TimeUnit.SECONDS)
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String string) {
                        System.out.println(string);
                    }
                });
    }
}

Without .delay(2, TimeUnit.SECONDS) i have the output: a b c d e f g h but with .delay(2, TimeUnit.SECONDS) the output lacks "g" and "h": a b c d e f

How can that be? Documentation says that delay just emits the items emitted by the source Observable shifted forward in time by a specified delay

like image 762
wilddev Avatar asked Dec 11 '22 20:12

wilddev


1 Answers

The delay overload that you are using schedules work on a different thread and results in an implicit race condition.All temporal operators (such as delay, buffer, and window) need to use a scheduler to schedule the effect for later and this can result in unexpected race conditions if you aren't aware of it and use them carefully. In this case the delay operator schedules the work downstream on a separate thread pool. Here is the order of execution (on the main thread) in your test.

  1. Your Observable is subscribed to and waits 1000 millis before onNext("a")
  2. Next it's received by the delay. This schedules the downstream onNext for 2 seconds later.
  3. Control flow returns immediately to your observable which waits 1000 millis.
  4. Observable onNext("b") to delay. Delay schedules the onNext of "b" for 2 seconds later.
  5. .... (repeat)
  6. When your observable calls onNext("h") it schedules the work then immediately returns from subscribe and terminates your test (causing the scheduled work to disappear).

In order to get it to execute asynchronously you you can schedule the delay on the trampoline scheduler implementation.

.delay(2, TimeUnit.SECONDS, Schedulers.trampoline())
like image 125
Aaron Avatar answered Dec 13 '22 08:12

Aaron