I've got a pretty standard API pagination problem which you can handle with some simple recursion. Here's a fabricated example:
public Observable<List<Result>> scan() {
return scanPage(Optional.empty(), ImmutableList.of());
}
private Observable<?> scanPage(Optional<KEY> startKey, List<Result> results) {
return this.scanner.scan(startKey, LIMIT)
.flatMap(page -> {
if (!page.getLastKey().isPresent()) {
return Observable.just(results);
}
return scanPage(page.getLastKey(), ImmutableList.<Result>builder()
.addAll(results)
.addAll(page.getResults())
.build()
);
});
}
But this can obviously create a massive callstack. How can I do this imperatively but maintain the Observable stream?
Here's an imperative blocking example:
public List<Result> scan() {
Optional<String> startKey = Optional.empty();
final ImmutableList.Builder<Result> results = ImmutableList.builder();
do {
final Page page = this.scanner.scan(startKey);
startKey = page.getLastKey();
results.addAll(page.getResults());
} while (startKey.isPresent());
return results.build();
}
JohnWowUs' answer is great and helped me understand how to avoid the recursion effectively, but there were some points I was still confused about, so I'm posting my tweaked version.
Summary:
Single
.Flowable
to stream each of the items contained in the pages. This means callers to our function do not need to know about the individual pages and can just collect the contained items.BehaviorProcessor
to start with the first page, and fetch each subsequent page once we have checked with the current page if the next page is available.processor.onNext(int)
starts the next iteration.This code depends on rxjava and reactive-streams.
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.processors.BehaviorProcessor;
public class Pagination {
// Fetch all pages and return the items contained in those pages, using the provided page fetcher function
public static <T> Flowable<T> fetchItems(Function<Integer, Single<Page<T>>> fetchPage) {
// Processor issues page indices
BehaviorProcessor<Integer> processor = BehaviorProcessor.createDefault(0);
// When an index number is issued, fetch the corresponding page
return processor.concatMap(index -> fetchPage.apply(index).toFlowable())
// when returning the page, update the processor to get the next page (or stop)
.doOnNext(page -> {
if (page.hasNext()) {
processor.onNext(page.getNextPageIndex());
} else {
processor.onComplete();
}
})
.concatMapIterable(Page::getElements);
}
public static void main(String[] args) {
fetchItems(Pagination::examplePageFetcher).subscribe(System.out::println);
}
// A function to fetch a page of our paged data
private static Single<Page<String>> examplePageFetcher(int index) {
return Single.just(pages.get(index));
}
// Create some paged data
private static ArrayList<Page<String>> pages = new ArrayList<>(3);
static {
pages.add(new Page<>(Arrays.asList("one", "two"), Optional.of(1)));
pages.add(new Page<>(Arrays.asList("three", "four"), Optional.of(2)));
pages.add(new Page<>(Arrays.asList("five"), Optional.empty()));
}
static class Page<T> {
private List<T> elements;
private Optional<Integer> nextPageIndex;
public Page(List<T> elements, Optional<Integer> nextPageIndex) {
this.elements = elements;
this.nextPageIndex = nextPageIndex;
}
public List<T> getElements() {
return elements;
}
public int getNextPageIndex() {
return nextPageIndex.get();
}
public boolean hasNext() {
return nextPageIndex.isPresent();
}
}
}
Output:
one
two
three
four
five
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With