I'm using RxJava, and I need to do 2 things:
Observable
onError
was called, vs. onCompleted
I've looked at using last
and lastOrDefault
(which is actually the behavior I need), but I've not been able to work around onError
hiding the last element. I would be OK with the Observable being used twice, once to get the last
value and once to get the completion status, but so far I've only been able to accomplish this by creating my own Observer
:
public class CacheLastObserver<T> implements Observer<T> {
private final AtomicReference<T> lastMessageReceived = new AtomicReference<>();
private final AtomicReference<Throwable> error = new AtomicReference<>();
@Override
public void onCompleted() {
// Do nothing
}
@Override
public void onError(Throwable e) {
error.set(e);
}
@Override
public void onNext(T message) {
lastMessageReceived.set(message);
}
public Optional<T> getLastMessageReceived() {
return Optional.ofNullable(lastMessageReceived.get());
}
public Optional<Throwable> getError() {
return Optional.ofNullable(error.get());
}
}
I've no problem with making my own Observer
, but it feels like Rx should be better able to meet this use-case of "get the last element emitted before completion`". Any ideas on how to accomplish this?
Try this:
source.materialize().buffer(2).last()
In the error case the last emission will be a list of two items being the last value emitted wrapped as a Notification
and the error notification. Without the error the second item will be the completion notification.
Note also that if no value is emitted then the result will be a list of one item being the terminal notification.
I solved with:
source.materialize().withPrevious().last()
where withPrevious
is (Kotlin):
fun <T> Observable<T>.withPrevious(): Observable<Pair<T?, T>> =
this.scan(Pair<T?, T?>(null, null)) { previous, current -> Pair(previous.second, current) }
.skip(1)
.map { it as Pair<T?, T> }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With