I am using the Rx-ified API for vertx, and this question has to do a potentially infinite retry-until-success loop I would like to implement but am having difficulty. I'm new to RxJava.
Here's what I'd like to do:
The first problem I encounter is how to accomplish step 2).
If you are familiar with the vert.x Rx api, this is what it means to issue the request in step 1) above:
vertx.eventBus().<JsonObject>sendObservable( "theAddress", aJsonObject );
The above code returns an Observable instance that will emit either the response, or an error (if there was a timeout, for example). That Observable will never emit anything ever again (or else it will always emit exactly the same thing every time something subscribes, I don't know which).
vertx.eventBus().<JsonObject>sendObservable( "theAddress", aJsonObject )
.retry()
I thought that in order to issue the retry, I could just use RxJava's retry() operator, which I tried, but doing so has exactly no useful effect whatsoever, due to the nature of the source observable. No new request message gets sent, because the only thing that is being "retried" is the subscription to the original source, which is never going to emit anything different.
vertx.eventBus().<JsonObject>sendObservable( "theAddress", aJsonObject )
.retryWhen( error -> {
return _vertx.eventBus().<JsonObject>sendObservable( "theAddress", aJsonObject )
})
So then I thought I could use RxJava's retryWhen() operator, which allows me to issue a second observable when the root observable emits an error. The second observable could, I figured, just be the same code above that produced the initial observer in step 1.
But, the retryWhen() operator (see documentation) does not allow this second observable to emit an error without ending the subscription with an error.
So, I'm having trouble figuring out how to set up a potentially infinite retry loop within the first part of this chain.
I must be missing something here, but I have not been able to determine what it is.
vertx.eventBus().<JsonObject>sendObservable( "theAddress", aJsonObject )
// imagine that retryWhen() accomplishes an infinite retry
.retryWhen( error -> {
return _vertx.eventBus().<JsonObject>sendObservable( ... )
})
.flatMap( response -> {
// inspect response, if it has usable data,
// return that data as an observable
return Observable.from(response.data());
// if the response has no usable data,
// wait for some time, then start the whole process
// all over again
return Observable.timer(timeToWait).<WHAT GOES HERE?>;
})
The second problem is how to implement step 3. This seems to me to be like the first problem, only harder to understand, because I don't need to retry the immediate source observable, I need to wait for a bit, then start over at step 1).
Whatever Observable I create would seem to require all of the elements in the chain leading up to this point, which seems like a kind of recursion that should probably be avoided.
Any help or suggestions would be really welcome at this point.
Many thanks to Ben Christensen over on the RxJava Google Group for pointing out the defer() operator, which will generate a new Observable on each subscription. This can then be composed with the standard retry() operator to get inifinite retry.
So, the simplest solution to the first problem in my question looks like so:
Observable.defer( () -> vertx.eventBus().<JsonObject>sendObservable( "theAddress", aJsonObject ) )
.retry()
...if exponential backoff is required, you can add Observable.timer() with appropriate parameters in the factory method given to the defer() operator.
I am still looking into the second problem.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With