Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava infinite retry with new observable

I am using the Rx-ified API for vertx, and this question has to do a potentially infinite retry-until-success loop I would like to implement but am having difficulty. I'm new to RxJava.

Here's what I'd like to do:

  1. Send a request to another vertx component using the vertx message bus
  2. For as long as I get a timeout waiting for a response, re-issue the request
  3. Once I have a response to the request, inspect the results, and if there is nothing usable, wait for a period and then start all over again at step 1)

The first problem

The first problem I encounter is how to accomplish step 2).

If you are familiar with the vert.x Rx api, this is what it means to issue the request in step 1) above:

vertx.eventBus().<JsonObject>sendObservable( "theAddress", aJsonObject );

The above code returns an Observable instance that will emit either the response, or an error (if there was a timeout, for example). That Observable will never emit anything ever again (or else it will always emit exactly the same thing every time something subscribes, I don't know which).

RxJava retry operator doesn't seem to work

vertx.eventBus().<JsonObject>sendObservable( "theAddress", aJsonObject )
     .retry()

I thought that in order to issue the retry, I could just use RxJava's retry() operator, which I tried, but doing so has exactly no useful effect whatsoever, due to the nature of the source observable. No new request message gets sent, because the only thing that is being "retried" is the subscription to the original source, which is never going to emit anything different.

RxJava retryWhen operator doesn't seem to work

vertx.eventBus().<JsonObject>sendObservable( "theAddress", aJsonObject )
     .retryWhen( error -> {
        return _vertx.eventBus().<JsonObject>sendObservable( "theAddress", aJsonObject )
      })

So then I thought I could use RxJava's retryWhen() operator, which allows me to issue a second observable when the root observable emits an error. The second observable could, I figured, just be the same code above that produced the initial observer in step 1.

But, the retryWhen() operator (see documentation) does not allow this second observable to emit an error without ending the subscription with an error.

So, I'm having trouble figuring out how to set up a potentially infinite retry loop within the first part of this chain.

I must be missing something here, but I have not been able to determine what it is.

The second problem

vertx.eventBus().<JsonObject>sendObservable( "theAddress", aJsonObject )
     // imagine that retryWhen() accomplishes an infinite retry
     .retryWhen( error -> {
        return _vertx.eventBus().<JsonObject>sendObservable( ... )
      })
     .flatMap( response -> {
        // inspect response, if it has usable data,
        // return that data as an observable
        return Observable.from(response.data());

        // if the response has no usable data,
        // wait for some time, then start the whole process
        // all over again
        return Observable.timer(timeToWait).<WHAT GOES HERE?>;
     })

The second problem is how to implement step 3. This seems to me to be like the first problem, only harder to understand, because I don't need to retry the immediate source observable, I need to wait for a bit, then start over at step 1).

Whatever Observable I create would seem to require all of the elements in the chain leading up to this point, which seems like a kind of recursion that should probably be avoided.

Any help or suggestions would be really welcome at this point.

like image 253
Hoobajoob Avatar asked Sep 27 '22 09:09

Hoobajoob


1 Answers

Many thanks to Ben Christensen over on the RxJava Google Group for pointing out the defer() operator, which will generate a new Observable on each subscription. This can then be composed with the standard retry() operator to get inifinite retry.

So, the simplest solution to the first problem in my question looks like so:

Observable.defer( () -> vertx.eventBus().<JsonObject>sendObservable( "theAddress", aJsonObject ) )
.retry()

...if exponential backoff is required, you can add Observable.timer() with appropriate parameters in the factory method given to the defer() operator.

I am still looking into the second problem.

like image 111
Hoobajoob Avatar answered Sep 30 '22 06:09

Hoobajoob