Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rx: How to get the last element even if onError was called?

I'm using RxJava, and I need to do 2 things:

  • Get the last element emitted from the Observable
  • Determine if onError was called, vs. onCompleted

I've looked at using last and lastOrDefault (which is actually the behavior I need), but I've not been able to work around onError hiding the last element. I would be OK with the Observable being used twice, once to get the last value and once to get the completion status, but so far I've only been able to accomplish this by creating my own Observer:

public class CacheLastObserver<T> implements Observer<T> {

    private final AtomicReference<T> lastMessageReceived = new AtomicReference<>();
    private final AtomicReference<Throwable> error = new AtomicReference<>();

    @Override
    public void onCompleted() {
        // Do nothing
    }

    @Override
    public void onError(Throwable e) {
        error.set(e);
    }

    @Override
    public void onNext(T message) {
        lastMessageReceived.set(message);
    }

    public Optional<T> getLastMessageReceived() {
        return Optional.ofNullable(lastMessageReceived.get());
    }

    public Optional<Throwable> getError() {
        return Optional.ofNullable(error.get());
    }
}

I've no problem with making my own Observer, but it feels like Rx should be better able to meet this use-case of "get the last element emitted before completion`". Any ideas on how to accomplish this?

like image 843
cdeszaq Avatar asked Jun 09 '16 19:06

cdeszaq


2 Answers

Try this:

source.materialize().buffer(2).last()

In the error case the last emission will be a list of two items being the last value emitted wrapped as a Notification and the error notification. Without the error the second item will be the completion notification.

Note also that if no value is emitted then the result will be a list of one item being the terminal notification.

like image 150
Dave Moten Avatar answered Sep 25 '22 10:09

Dave Moten


I solved with:

source.materialize().withPrevious().last()

where withPrevious is (Kotlin):

fun <T> Observable<T>.withPrevious(): Observable<Pair<T?, T>> =
    this.scan(Pair<T?, T?>(null, null)) { previous, current -> Pair(previous.second, current) }
        .skip(1)
        .map { it as Pair<T?, T> }
like image 23
maurocchi Avatar answered Sep 25 '22 10:09

maurocchi