Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Observable's doOnError correct location

Tags:

java

rx-java

I am kind of new to Observers, and I am still trying to figure them out. I have the following piece of code:

observableKafka.getRealTimeEvents()
        .filter(this::isTrackedAccount)
        .filter(e -> LedgerMapper.isDepositOrClosedTrade((Transaction) e.getPayload()))
        .map(ledgerMapper::mapLedgerTransaction)
        .map(offerCache::addTransaction)
        .filter(offer -> offer != null)  // Offer may have been removed from cache since last check
        .filter(Offer::isReady)
        .doOnError(throwable -> { 
              LOG.info("Exception thrown on realtime events");
          })
        .forEach(awardChecker::awardFailOrIgnore);

getRealTimeEvents() returns an Observable<Event>.

Does the location of .doOnError matters? Also, what is the effect of adding more than one call to it in this piece of code? I have realised I can do it and all of them get invoked, but I am not sure of what could be its purpose.

like image 656
Tavo Avatar asked Apr 27 '15 13:04

Tavo


2 Answers

Yes, it does. doOnError acts when an error is passing through the stream at that specific point, so if the operator(s) before doOnError throw(s), your action will be called. However, if you place the doOnError further up, it may or may not be called depending on what downstream operators are in the chain.

Given

Observer<Object> ignore = new Observer<Object>() {
    @Override public void onCompleted() {
    }
    @Override public void onError(Throwable e) {
    }
    @Override public void onNext(Object t) {
    }
};

For example, the following code will always call doOnError:

Observable.<Object>error(new Exception()).doOnError(e -> log(e)).subscribe(ignore);

However, this code won't:

Observable.just(1).doOnError(e -> log(e))
.flatMap(v -> Observable.<Integer>error(new Exception())).subscribe(ignore);

Most operators will bounce back exceptions that originate downstream.

Adding multipe doOnError is viable if you transform an exception via onErrorResumeNext or onExceptionResumeNext:

Observable.<Object>error(new RuntimeException())
.doOnError(e -> log(e))
.onErrorResumeNext(Observable.<Object>error(new IllegalStateException()))
.doOnError(e -> log(e)).subscribe(ignore);

otherwise, you'd log the same exception at multiple locations of the chain.

like image 179
akarnokd Avatar answered Oct 17 '22 20:10

akarnokd


the doOn??? methods are there for side-effects, processing that doesn't really is your core business value let's say. Logging is a perfectly fine use for that. That said, sometimes you want to do something more meaningful with an error, like retrying, or displaying a message to a user, etc... For these cases, the "rx" way would be to process the error in a subscribe call.

doOnError (and the other doOn methods) wraps the original Observable into a new one and adds behavior to it (around its onError method, obviously). That's why you can call it many times. Also one benefit of being able to call it anywhere in the chain is that you can access errors that would otherwise be hidden from the consumer of the stream (the Subscriber), for instance because there's a retry down in the chain...

like image 6
Simon Baslé Avatar answered Oct 17 '22 18:10

Simon Baslé