Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Lazy fetching of paginated objects using RxJava

I'm almost sold to RxJava, which is a perfect companion to Retrofit, but I'm struggling into a common pattern while migrating my code: to save bandwidth, I'd like to lazily fetch (paginated) objects from my webservice as needed, while my listview (or recyclerview) is scrolling using reactive programming.

My previous code was doing the job perfectly, but reactive programming seems worth the try.

Listening to listview/recyclerview scrolling (and other boring stuffs) isn't the concern and getting an Observable is easy using Retrofit:

@GET("/api/messages")
Observable<List<Message>> getMessages(@Path("offset") int offset, @Path("limit") int limit);

I just can't figure out the pattern to use in reactive programming.

The Concat operator seems a good starting point, along with ConnectableObservable at some point to defer emission and maybe flatMap, but how ?

EDIT:

Here's my current (naive) solution:

public interface Paged<T> {
    boolean isLoading();
    void cancel();
    void next(int count);
    void next(int count, Scheduler scheduler);
    Observable<List<T>> asObservable();
    boolean hasCompleted();
    int position();
}

And my implementation using a subject:

public abstract class SimplePaged<T> implements Paged<T> {

    final PublishSubject<List<T>> subject = PublishSubject.create();
    private volatile boolean loading;
    private volatile int offset;
    private Subscription subscription;

    @Override
    public boolean isLoading() {
        return loading;
    }

    @Override
    public synchronized void cancel() {
        if(subscription != null && !subscription.isUnsubscribed())
            subscription.unsubscribe();

        if(!hasCompleted())
            subject.onCompleted();

        subscription = null;
        loading = false;
    }

    @Override
    public void next(int count) {
        next(count, null);
    }

    @Override
    public synchronized void next(int count, Scheduler scheduler) {
        if (isLoading())
            throw new IllegalStateException("you can't call next() before onNext()");

        if(hasCompleted())
            throw new IllegalStateException("you can't call next() after onCompleted()");

        loading = true;

        Observable<List<T>> obs = onNextPage(offset, count).single();

        if(scheduler != null)
            obs = obs.subscribeOn(scheduler); // BEWARE! onNext/onError/onComplete will happens on that scheduler!

        subscription = obs.subscribe(this::onNext, this::onError, this::onComplete);
    }

    @Override
    public Observable<List<T>> asObservable() {
        return subject.asObservable();
    }

    @Override
    public boolean hasCompleted() {
        return subject.hasCompleted();
    }

    @Override
    public int position() {
        return offset;
    }

    /* Warning: functions below may be called from another thread */
    protected synchronized void onNext(List<T> items) {
        if (items != null)
            offset += items.size();

        loading = false;

        if (items == null || items.size() == 0)
            subject.onCompleted();
        else
            subject.onNext(items);
    }

    protected synchronized void onError(Throwable t) {
        loading = false;
        subject.onError(t);
    }

    protected synchronized void onComplete() {
        loading = false;
    }

    abstract protected Observable<List<T>> onNextPage(int offset, int count);

}
like image 680
Renaud Cerrato Avatar asked Feb 10 '15 09:02

Renaud Cerrato


1 Answers

Here's one out of a few potential ways to handle reactive paging. Let's assume we have a method getNextPageTrigger which returns an Observable emits some event object when the scroll listener (or whatever input) wants a new page to be loaded. In real life it would probably have the debounce operator, but in addition to that we'll make sure we only trigger it after the latest page has loaded.

We also define a method to unwrap the messages from their list:

Observable<Message> getPage(final int page) {
  return service.getMessages(page * PAGE_SIZE, PAGE_SIZE)
      .flatMap(messageList -> Observable.from(messageList));
}

Then we can make the actual fetching logic:

// Start with the first page.
getPage(0)
    // Add on each incremental future page.
    .concatWith(Observable.range(1, Integer.MAX_VALUE)
        // Uses a little trick to get the next page to wait for a signal to load.
        // By ignoring all actual elements emitted and casting, the trigger must
        // complete before the actual page request will be made.
        .concatMap(page -> getNextPageTrigger().limit(1)
            .ignoreElements()
            .cast(Message.class)
            .concatWith(getPage(page)))  // Then subscribe, etc..

This is still missing a couple potentially important things:

1 - This obviously doesn't know when to stop fetching additional pages, which means once it hits the end, depending on what the server returns, it could either keep hitting errors or empty results every time scroll is triggered. Approaches to solving this depend on how you signal to the client that there are no more pages to load.

2 - If you need error retries, I would suggest looking into the retryWhen operator. Otherwise, common network errors could cause an error in a page load to propagate.

like image 123
lopar Avatar answered Oct 22 '22 15:10

lopar