I really like the RxJava, it's a wonderful tool but somethimes it's very hard to understand how it works. We use Retrofit with a RxJava in our Android project and there is a following use-case:
I need to poll the server, with some delay between retries, while server is doing some job. When server is done I have to deliver the result. So I've successfully done it with RxJava, here is the code snippet: I used "skipWhile" with "repeatWhen"
Subscription checkJobSubscription = mDataManager.checkJob(prepareTweakJob)
.skipWhile(new Func1<CheckJobResponse, Boolean>() {
@Override
public Boolean call(CheckJobResponse checkJobResponse) {
boolean shouldSkip = false;
if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, jobStatus " + checkJobResponse.getJobStatus());
switch (checkJobResponse.getJobStatus()){
case CheckJobResponse.PROCESSING:
shouldSkip = true;
break;
case CheckJobResponse.DONE:
case CheckJobResponse.ERROR:
shouldSkip = false;
break;
}
if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, shouldSkip " + shouldSkip);
return shouldSkip;
}
})
.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
if (SHOW_LOGS) Logger.v(TAG, "checkJob, repeatWhen " + observable);
return observable.delay(1, TimeUnit.SECONDS);
}
}).subscribe(new Subscriber<CheckJobResponse>(){
@Override
public void onNext(CheckJobResponse response) {
if (SHOW_LOGS) Logger.v(TAG, "checkJob, onSuccess, response " + response);
}
@Override
public void onError(BaseError error) {
if (SHOW_LOGS) Logger.v(TAG, "checkJob, onError, canEditTimeline, error " + error);
Toast.makeText(ChoseEditOptionActivity.this, R.string.NETWORK__no_internet_message, Toast.LENGTH_LONG).show();
}
@Override
public void onCompleted() {
if (SHOW_LOGS) Logger.v(TAG, "onCompleted");
}
});
The code works fine:
When server responded that job is processing I return "true" from "skipWhile" chain, the original Observable waits for 1 second and do the http request again. This process repeats until I return "false" from "skipWhile" chain.
Here is a few things I don't understand:
I saw in the documentation of "skipWhile" that it will not emit anything (onError, onNext, onComplete) from original Observable until I return "false" from its "call" method. So If it doesn't emit anything why does the "repeatWhen" Observable doing it's job? It waits for one second and run the request again. Who launches it?
The second question is: Why Observable from "repeatWhen" is not running forever, I mean why it stops repeating when I return "false" from "skipWhile"? I get onNext successfully in my Subscriber if I return "false".
In documentation of "repeatWhile" it says that eventually I get a call to "onComplete" in my Subscriber but "onComplete" is never called.
It makes no difference if I change the order of chaining "skipWhile" and "repeatWhen". Why is that ?
I understand that RxJava is opensource and I could just read the code, but as I said - it's really hard to understand.
Thanks.
I've not worked with repeatWhen
before, but this question made me curious, so I did some research.
skipWhile
does emit onError
and onCompleted
, even if it never returns true
before then. As such, repeatWhen
is being called every time checkJob()
emits onCompleted
. That answers question #1.
The rest of the questions are predicated on false assumptions. Your subscription is running forever because your repeatWhen
never terminates. That's because repeatWhen
is a more complex beast than you realize. The Observable
in it emits null
whenever it gets onCompleted
from the source. If you take that and return onCompleted
then it ends, otherwise if you emit anything it retries. Since delay
just takes an emission and delays it, it's always emitting the null
again. As such, it constantly resubscribes.
The answer to #2, then, is that it is running forever; you're probably doing something else outside this code to cancel the subscription. For #3, you never get onCompleted
because it never completes. For #4, the order doesn't matter because you're repeating indefinitely.
The question now is, how do you get the correct behavior? It's as simple as using takeUntil
instead of skipWhile
. That way, you keep repeating until you get the result you want, thus terminating the stream when you want it to end.
Here's a code sample:
Observable<Boolean> source = ...; // Something that eventually emits true
source
.repeatWhen(completed -> completed.delay(1, TimeUnit.SECONDS))
.takeUntil(result -> result)
.filter(result -> result)
.subscribe(
res -> System.out.println("onNext(" + res + ")"),
err -> System.out.println("onError()"),
() -> System.out.println("onCompleted()")
);
In this example, source
is emitting booleans. I repeat every 1 second until the source emits true
. I keep taking until result
is true
. And I filter out all notifications that are false
, so the subscriber doesn't get them until it's true
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With