Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to handle paging with RxJava?

Tags:

rx-java

I'm looking into converting my android app to use Rxjava for network requests. I currently access a webservice similar to:

getUsersByKeyword(String query, int limit, int offset)

As I understand it, Observables are a "push" rather than a "pull" interface. So here's how I understand things to work out:

  • app registers with service, getting Observable for query
  • results are pushed to app
  • app deals with results
  • when app wants more results ...?

This is where things break down for me. Previously I would just ask the webservice for exactly what I want, make the query again with the offset. But in this case that would involve creating another Observable and subscribing to it, kind of defeating the point.

How should I handle paging in my app? (It's an android app, but I don't think that is relevant).

like image 317
john the android guy Avatar asked Oct 12 '14 01:10

john the android guy


3 Answers

It was hard rock!) So we have request to network:
getUsersByKeyword(String query, int limit, int offset)
and this request return for example
List< Result >
If we use RetroFit for networking that request will look:
Observable< List< Result >> getUsersByKeyword(String query, int limit, int offset)
As result we want to get all Result from server.
So it will look like this

int page = 50;
int limit = page;
Observable
                .range(0, Integer.MAX_VALUE - 1)
                .concatMap(new Func1<Integer, Observable<List<Result>>>() {
                    @Override
                    public Observable<List<Result>> call(Integer integer) {
                        return getUsersByKeyword(query, integer * page, limit);
                    }
                })
                .takeWhile(new Func1<List<Result>, Boolean>() {
                    @Override
                    public Boolean call(List<Result> results) {
                        return !results.isEmpty();
                    }
                })
                .scan(new Func2< List<Result>, List<Result>, List<Result>>() {
                    @Override
                    public List<Result> call(List<Result> results, List< Result> results2) {
                        List<Result> list = new ArrayList<>();
                        list.addAll(results);
                        list.addAll(results2);
                        return list;
                    }
                })
                .last()
                .subscribe(new Subscriber<List<Result>>() {
                    @Override
                    public void onCompleted() {
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(List<Results> results) {
                    }
                });

Code was TESTED!

like image 131
xoxol_89 Avatar answered Nov 13 '22 13:11

xoxol_89


So, if this is one-way paging, here's a pattern you might try. This code hasn't been run or compiled, but I've tried to over-annotate in order to explain what's going on.

private static final int LIMIT = 50;

// Given: Returns a stream of dummy event objects telling us when
// to grab the next page. This could be from a click or wherever.
Observable<NextPageEvent> getNextPageEvents();  

// Given:
// The search query keywords. Each emission here means a new top-level
// request;
Observable<String> queries;

queries.switchMap((query) -> getNextPageEvents()
        // Ignore 'next page' pokes when unable to take them.
        .onBackPressureDrop()
        // Seed with the first page.
        .startWith(new NextPageEvent())
        // Increment the page number on each request.
        .scan(0, (page, event) -> page + 1) 
        // Request the page from the server.
        .concatMap((page) -> getUsersByKeyword(query, LIMIT, LIMIT * page)
                // Unroll Observable<List<User> into Observable<User>
                .concatMap((userList) -> Observable.from(userList))
                .retryWhen(/** Insert your favorite retry logic here. */))
        // Only process new page requests sequentially.
        .scheduleOn(Schedulers.trampoline())
        // Trampoline schedules on the 'current thread', so we make sure that's
        // a background IO thread.
        .scheduleOn(Schedulers.io());

That should let the 'next page events' signal trigger a load of the next page's data each time, as well as not jumping pages should it encounter an error loading one. It also restarts completely at the top level if it receives a new search query. If I (or somebody else?) has time, I'd like to check my assumptions about the trampoline and backpressure and make sure it blocks any attempt to prematurely fetch the next page while one is loading.

like image 23
lopar Avatar answered Nov 13 '22 15:11

lopar


I've done this and it's actually not that hard.

The approach is to model every first request (offset 0) in a firstRequestsObservable. To make it easy, you can make this as a PublishSubject where you call onNext() to feed in the next request, but there are smarter non-Subject ways of doing it (e.g., if requests are done when a button is clicked, then the requestObservable is the clickObservable mapped through some operators).

Once you have firstRequestsObservable in place, you can make responseObservable by flatMapping from firstRequestsObservable and so forth, to make the service call.

Now here comes the trick: make another observable called subsequentRequestsObservable which is mapped from responseObservable, incrementing the offset (for this purpose it's good to include, in the response data, the offset of the originating request). Once you introduce this observable, you now have to change the definition of responseObservable so that it depends also on subsequentRequestsObservable. You then get a circular dependency like this:

firstRequestsObservable -> responseObservable -> subsequentRequestsObservable -> responseObservable -> subsequentRequestsObservable -> ...

To break this cycle, you probably want to include a filter operator in the definition of subsequentRequestsObservable, filtering out those cases where the offset would pass the "total" limit. The circular dependency also means that you need to have one of those being a Subject, otherwise it would be impossible to declare the observables. I recommend responseObservable to be that Subject.

So, all in all, you first initialize responseObservable as a Subject, then declare firstRequestsObservable, then declare subsequentRequestsObservable as the result of passing responseObservable through some operators. responseObservable can then be "fed in" by using onNext.

like image 2
André Staltz Avatar answered Nov 13 '22 13:11

André Staltz