I am kind of new to Observers
, and I am still trying to figure them out. I have the following piece of code:
observableKafka.getRealTimeEvents()
.filter(this::isTrackedAccount)
.filter(e -> LedgerMapper.isDepositOrClosedTrade((Transaction) e.getPayload()))
.map(ledgerMapper::mapLedgerTransaction)
.map(offerCache::addTransaction)
.filter(offer -> offer != null) // Offer may have been removed from cache since last check
.filter(Offer::isReady)
.doOnError(throwable -> {
LOG.info("Exception thrown on realtime events");
})
.forEach(awardChecker::awardFailOrIgnore);
getRealTimeEvents()
returns an Observable<Event>
.
Does the location of .doOnError
matters? Also, what is the effect of adding more than one call to it in this piece of code? I have realised I can do it and all of them get invoked, but I am not sure of what could be its purpose.
Yes, it does. doOnError
acts when an error is passing through the stream at that specific point, so if the operator(s) before doOnError
throw(s), your action will be called. However, if you place the doOnError
further up, it may or may not be called depending on what downstream operators are in the chain.
Given
Observer<Object> ignore = new Observer<Object>() {
@Override public void onCompleted() {
}
@Override public void onError(Throwable e) {
}
@Override public void onNext(Object t) {
}
};
For example, the following code will always call doOnError:
Observable.<Object>error(new Exception()).doOnError(e -> log(e)).subscribe(ignore);
However, this code won't:
Observable.just(1).doOnError(e -> log(e))
.flatMap(v -> Observable.<Integer>error(new Exception())).subscribe(ignore);
Most operators will bounce back exceptions that originate downstream.
Adding multipe doOnError
is viable if you transform an exception via onErrorResumeNext
or onExceptionResumeNext
:
Observable.<Object>error(new RuntimeException())
.doOnError(e -> log(e))
.onErrorResumeNext(Observable.<Object>error(new IllegalStateException()))
.doOnError(e -> log(e)).subscribe(ignore);
otherwise, you'd log the same exception at multiple locations of the chain.
the doOn???
methods are there for side-effects, processing that doesn't really is your core business value let's say. Logging is a perfectly fine use for that.
That said, sometimes you want to do something more meaningful with an error, like retrying, or displaying a message to a user, etc... For these cases, the "rx" way would be to process the error in a subscribe
call.
doOnError
(and the other doOn
methods) wraps the original Observable
into a new one and adds behavior to it (around its onError
method, obviously). That's why you can call it many times. Also one benefit of being able to call it anywhere in the chain is that you can access errors that would otherwise be hidden from the consumer of the stream (the Subscriber
), for instance because there's a retry down in the chain...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With