Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using of "skipWhile" combined with "repeatWhen" in RxJava to implement server polling

I really like the RxJava, it's a wonderful tool but somethimes it's very hard to understand how it works. We use Retrofit with a RxJava in our Android project and there is a following use-case:

I need to poll the server, with some delay between retries, while server is doing some job. When server is done I have to deliver the result. So I've successfully done it with RxJava, here is the code snippet: I used "skipWhile" with "repeatWhen"

Subscription checkJobSubscription = mDataManager.checkJob(prepareTweakJob)
        .skipWhile(new Func1<CheckJobResponse, Boolean>() {

            @Override
            public Boolean call(CheckJobResponse checkJobResponse) {
                boolean shouldSkip = false;

                if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, jobStatus " + checkJobResponse.getJobStatus());

                switch (checkJobResponse.getJobStatus()){
                    case CheckJobResponse.PROCESSING:
                        shouldSkip = true;
                        break;
                    case CheckJobResponse.DONE:
                    case CheckJobResponse.ERROR:
                        shouldSkip = false;
                        break;
                }
                if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, shouldSkip " + shouldSkip);

                return shouldSkip;
            }
        })
        .repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
            @Override
            public Observable<?> call(Observable<? extends Void> observable) {
                if (SHOW_LOGS) Logger.v(TAG, "checkJob, repeatWhen " + observable);
                return observable.delay(1, TimeUnit.SECONDS);
            }
        }).subscribe(new Subscriber<CheckJobResponse>(){
            @Override
            public void onNext(CheckJobResponse response) {
                if (SHOW_LOGS) Logger.v(TAG, "checkJob, onSuccess, response " + response);

            }

            @Override
            public void onError(BaseError error) {
                if (SHOW_LOGS) Logger.v(TAG, "checkJob, onError, canEditTimeline, error " + error);
                Toast.makeText(ChoseEditOptionActivity.this, R.string.NETWORK__no_internet_message, Toast.LENGTH_LONG).show();

            }

            @Override
            public void onCompleted() {
                if (SHOW_LOGS) Logger.v(TAG, "onCompleted");
            }
        });

The code works fine:

When server responded that job is processing I return "true" from "skipWhile" chain, the original Observable waits for 1 second and do the http request again. This process repeats until I return "false" from "skipWhile" chain.

Here is a few things I don't understand:

  1. I saw in the documentation of "skipWhile" that it will not emit anything (onError, onNext, onComplete) from original Observable until I return "false" from its "call" method. So If it doesn't emit anything why does the "repeatWhen" Observable doing it's job? It waits for one second and run the request again. Who launches it?

  2. The second question is: Why Observable from "repeatWhen" is not running forever, I mean why it stops repeating when I return "false" from "skipWhile"? I get onNext successfully in my Subscriber if I return "false".

  3. In documentation of "repeatWhile" it says that eventually I get a call to "onComplete" in my Subscriber but "onComplete" is never called.

  4. It makes no difference if I change the order of chaining "skipWhile" and "repeatWhen". Why is that ?

I understand that RxJava is opensource and I could just read the code, but as I said - it's really hard to understand.

Thanks.

like image 529
Danylo Volokh Avatar asked Jan 22 '16 09:01

Danylo Volokh


1 Answers

I've not worked with repeatWhen before, but this question made me curious, so I did some research.

skipWhile does emit onError and onCompleted, even if it never returns true before then. As such, repeatWhen is being called every time checkJob() emits onCompleted. That answers question #1.

The rest of the questions are predicated on false assumptions. Your subscription is running forever because your repeatWhen never terminates. That's because repeatWhen is a more complex beast than you realize. The Observable in it emits null whenever it gets onCompleted from the source. If you take that and return onCompleted then it ends, otherwise if you emit anything it retries. Since delay just takes an emission and delays it, it's always emitting the null again. As such, it constantly resubscribes.

The answer to #2, then, is that it is running forever; you're probably doing something else outside this code to cancel the subscription. For #3, you never get onCompleted because it never completes. For #4, the order doesn't matter because you're repeating indefinitely.

The question now is, how do you get the correct behavior? It's as simple as using takeUntil instead of skipWhile. That way, you keep repeating until you get the result you want, thus terminating the stream when you want it to end.

Here's a code sample:

Observable<Boolean> source = ...; // Something that eventually emits true

source
    .repeatWhen(completed -> completed.delay(1, TimeUnit.SECONDS))
    .takeUntil(result -> result)
    .filter(result -> result)
    .subscribe(
        res -> System.out.println("onNext(" + res + ")"),
        err -> System.out.println("onError()"),
        () -> System.out.println("onCompleted()")
    );

In this example, source is emitting booleans. I repeat every 1 second until the source emits true. I keep taking until result is true. And I filter out all notifications that are false, so the subscriber doesn't get them until it's true.

like image 113
Dan Lew Avatar answered Sep 28 '22 01:09

Dan Lew