I'm looking into converting my android app to use Rxjava for network requests. I currently access a webservice similar to:
getUsersByKeyword(String query, int limit, int offset)
As I understand it, Observables are a "push" rather than a "pull" interface. So here's how I understand things to work out:
This is where things break down for me. Previously I would just ask the webservice for exactly what I want, make the query again with the offset. But in this case that would involve creating another Observable and subscribing to it, kind of defeating the point.
How should I handle paging in my app? (It's an android app, but I don't think that is relevant).
It was hard rock!)
So we have request to network: getUsersByKeyword(String query, int limit, int offset)
and this request return for example List< Result >
If we use RetroFit for networking that request will look: Observable< List< Result >> getUsersByKeyword(String query, int limit, int offset)
As result we want to get all Result
from server.
So it will look like this
int page = 50;
int limit = page;
Observable
.range(0, Integer.MAX_VALUE - 1)
.concatMap(new Func1<Integer, Observable<List<Result>>>() {
@Override
public Observable<List<Result>> call(Integer integer) {
return getUsersByKeyword(query, integer * page, limit);
}
})
.takeWhile(new Func1<List<Result>, Boolean>() {
@Override
public Boolean call(List<Result> results) {
return !results.isEmpty();
}
})
.scan(new Func2< List<Result>, List<Result>, List<Result>>() {
@Override
public List<Result> call(List<Result> results, List< Result> results2) {
List<Result> list = new ArrayList<>();
list.addAll(results);
list.addAll(results2);
return list;
}
})
.last()
.subscribe(new Subscriber<List<Result>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(List<Results> results) {
}
});
Code was TESTED!
So, if this is one-way paging, here's a pattern you might try. This code hasn't been run or compiled, but I've tried to over-annotate in order to explain what's going on.
private static final int LIMIT = 50;
// Given: Returns a stream of dummy event objects telling us when
// to grab the next page. This could be from a click or wherever.
Observable<NextPageEvent> getNextPageEvents();
// Given:
// The search query keywords. Each emission here means a new top-level
// request;
Observable<String> queries;
queries.switchMap((query) -> getNextPageEvents()
// Ignore 'next page' pokes when unable to take them.
.onBackPressureDrop()
// Seed with the first page.
.startWith(new NextPageEvent())
// Increment the page number on each request.
.scan(0, (page, event) -> page + 1)
// Request the page from the server.
.concatMap((page) -> getUsersByKeyword(query, LIMIT, LIMIT * page)
// Unroll Observable<List<User> into Observable<User>
.concatMap((userList) -> Observable.from(userList))
.retryWhen(/** Insert your favorite retry logic here. */))
// Only process new page requests sequentially.
.scheduleOn(Schedulers.trampoline())
// Trampoline schedules on the 'current thread', so we make sure that's
// a background IO thread.
.scheduleOn(Schedulers.io());
That should let the 'next page events' signal trigger a load of the next page's data each time, as well as not jumping pages should it encounter an error loading one. It also restarts completely at the top level if it receives a new search query. If I (or somebody else?) has time, I'd like to check my assumptions about the trampoline and backpressure and make sure it blocks any attempt to prematurely fetch the next page while one is loading.
I've done this and it's actually not that hard.
The approach is to model every first request (offset 0) in a firstRequestsObservable. To make it easy, you can make this as a PublishSubject where you call onNext()
to feed in the next request, but there are smarter non-Subject ways of doing it (e.g., if requests are done when a button is clicked, then the requestObservable is the clickObservable mapped through some operators).
Once you have firstRequestsObservable
in place, you can make responseObservable
by flatMapping from firstRequestsObservable
and so forth, to make the service call.
Now here comes the trick: make another observable called subsequentRequestsObservable
which is mapped from responseObservable
, incrementing the offset (for this purpose it's good to include, in the response data, the offset of the originating request). Once you introduce this observable, you now have to change the definition of responseObservable
so that it depends also on subsequentRequestsObservable
. You then get a circular dependency like this:
firstRequestsObservable -> responseObservable -> subsequentRequestsObservable -> responseObservable -> subsequentRequestsObservable -> ...
To break this cycle, you probably want to include a filter
operator in the definition of subsequentRequestsObservable
, filtering out those cases where the offset would pass the "total" limit. The circular dependency also means that you need to have one of those being a Subject, otherwise it would be impossible to declare the observables. I recommend responseObservable
to be that Subject.
So, all in all, you first initialize responseObservable as a Subject, then declare firstRequestsObservable, then declare subsequentRequestsObservable as the result of passing responseObservable through some operators. responseObservable can then be "fed in" by using onNext
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With