Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Dynamic delay value with repeatWhen()

Tags:

rx-java

Right now I'm implementing some polling logic with RxJava. I'm supposed to poll an endpoint a number of times until it tells me to stop. Additionally, each response comes back with a time that I'm supposed to delay by before polling again. My logic looks something like this right now:

service.pollEndpoint()
    .repeatWhen(observable -> observable.delay(5000, TimeUnit.MILLISECONDS))
    .takeUntil(Blah::shouldStopPolling);

Right now I have the delay value hardcoded to 5000, but I'd like it to depend on a value in the poll response. I tried using a flatmap that returned Observable.just(pollResponse).repeatWhen(observable -> observable.delay(pollResponse.getDelay(), TimeUnit.MILLISECONDS)), but that didn't seem like the right idea since it messed with the source Observable. I feel like it's something simple that I'm overlooking. Thanks!

like image 315
telkins Avatar asked Apr 29 '16 00:04

telkins


2 Answers

As @JohnWowUs mentioned, you need out-of-band communication, but if you subscribe to the sequence more than once, you can use defer to have per-subscriber state:

Observable.defer(() -> {
    int[] pollDelay = { 0 };
    return service.pollEndpoint()
    .doOnNext(response -> pollDelay[0] = response.getDelay())
    .repeatWhen(o -> o.flatMap(v -> Observable.timer(pollDelay[0], MILLISECONDS)))
    .takeUntil(Blah::shouldStopPolling);
});
like image 137
akarnokd Avatar answered Sep 23 '22 02:09

akarnokd


You could use the side effect operator doOnNext to update a delay variable and then use that in your repeatWhen

int pollDelay = 5000;

service.pollEndpoint()
.doOnNext(pollResponse -> pollDelay=pollResponse.getDelay())
.repeatWhen(observable -> observable.delay(pollDelay, TimeUnit.MILLISECONDS))
.takeUntil(Blah::shouldStopPolling);
like image 38
JohnWowUs Avatar answered Sep 24 '22 02:09

JohnWowUs