Right now I'm implementing some polling logic with RxJava. I'm supposed to poll an endpoint a number of times until it tells me to stop. Additionally, each response comes back with a time that I'm supposed to delay by before polling again. My logic looks something like this right now:
service.pollEndpoint()
.repeatWhen(observable -> observable.delay(5000, TimeUnit.MILLISECONDS))
.takeUntil(Blah::shouldStopPolling);
Right now I have the delay value hardcoded to 5000, but I'd like it to depend on a value in the poll response. I tried using a flatmap that returned Observable.just(pollResponse).repeatWhen(observable -> observable.delay(pollResponse.getDelay(), TimeUnit.MILLISECONDS))
, but that didn't seem like the right idea since it messed with the source Observable. I feel like it's something simple that I'm overlooking. Thanks!
As @JohnWowUs mentioned, you need out-of-band communication, but if you subscribe to the sequence more than once, you can use defer
to have per-subscriber state:
Observable.defer(() -> {
int[] pollDelay = { 0 };
return service.pollEndpoint()
.doOnNext(response -> pollDelay[0] = response.getDelay())
.repeatWhen(o -> o.flatMap(v -> Observable.timer(pollDelay[0], MILLISECONDS)))
.takeUntil(Blah::shouldStopPolling);
});
You could use the side effect operator doOnNext to update a delay variable and then use that in your repeatWhen
int pollDelay = 5000;
service.pollEndpoint()
.doOnNext(pollResponse -> pollDelay=pollResponse.getDelay())
.repeatWhen(observable -> observable.delay(pollDelay, TimeUnit.MILLISECONDS))
.takeUntil(Blah::shouldStopPolling);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With