I am trying to build an infinitely scrolling list using TypeScript and rxjs. That is, I want the application to fetch few pages of results from the backend, then fetch more results whenever the user scrolls near the bottom.
I have an Observable, built with Observable.prototype.expand()
, which will give me all results, eventually fetching all pages from the server. However due to the nature of Observable, I cannot pause this process. Once I subscribe it will inevitably fetch all results as fast as possible. I need a different solution, where I can pull from the stream of results at the speed I need.
Things are made more complicated by the fact, that I cannot fetch the second page from the API directly, every page contains the info I need to fetch the next one. A reply looks like this:
interface GraphApiResponse {
data?: any[];
paging?: {
cursors: {
before: string,
after: string
},
next?: string,
previous?: string
};
}
The presence of the paging.next
indicates there is another page and paging.cursors.after
is used to actually retrieve it.
I cannot seem to figure out how to implement this, without making it messy. However, an infinite list seems like such a common problem, that it is unlikely there would not be a good solution for it. How should I go about implementing this, without turning the thing into a mess?
My first thought was using an Iterable of Promises, however I do not know how many results I will get, forcing me to build an infinite Iterable<Promise<Response?>>
whose Promises will all resolve to undefined
after a certain point. However since it is infinite I can't iterate over that normally (it would fill the entire available memory with Promises), actually using the results when they are in that form means getting every Promise in the resolve function of the previous one.
This solution seems like it might work, but with every line I write it gets less readable and more complex.
While googling on the issue I found a related SO question as well as a GitHub issue on rxjs backpressure, both contain code snippets by Ben Lesh, that can apparently be used to add backpressure to an Observable, sadly, no matter what I try, I cannot get the source Observable to emit it's values slower than it generates them, they always just get buffered somewhere, which means, that the network requests will happen no matter what.
From GitHub:
// this behavior subject is basically your "give me the next batch" mechanism.
// in this example, we're going to make 5 async requests back to back before requesting more.
const BATCH_SIZE = 5;
const requests = new BehaviorSubject(BATCH_SIZE); // start by requesting five items
// for every request, pump out a stream of events that represent how many you have left to fulfill
requests.flatMap((count) => Observable.range(0, count).map(n => count - n - 1))
// then concat map that into an observable of what you want to control with backpressure
// you might have some parameterization here you need to handle, this example is simplified
// handle side effects with a `do` block
.concatMap(() => getSomeObservableOfDataHere().do(stuffWithIt), (remaining) => remaining)
// narrow it down to when there are no more left to request,
// and pump another batch request into the BehaviorSubject
.filter(remaining => remaining === 0)
.mapTo(BATCH_SIZE)
.subscribe(requests);
From StackOverflow:
// start with 5 values
const controller = new Rx.BehaviorSubject(5);
// some observable source, in this case, an interval.
const source = Rx.Observable.interval(100)
const controlled = controller.flatMap(
// map your count into a set of values
(count) => source.take(count),
// additional mapping for metadata about when the block is done
(count, value, _, index) => {
return { value: value, done: count - index === 1 };
})
// when the block is done, request 5 more.
.do(({done}) => done && controller.next(5))
// we only care about the value for output
.map(({value}) => value);
// start our subscription
controlled.subscribe(x => {
console.log(x)
});
I might be wrong about this, but it seems to me, that once I subscribe to an Observable it produces its values as fast as possible, with no way to slow it down, so this is probably not a solution.
It seems that ixjs is meant to be a solution to my problem, however that repository has not been updated in a long time. There apparently is a reimplementation in TypeScript, however that seems to be early in development and not well documented jet.
I would rather not depend on a framework used by so few people for what is actually a very simple problem.
I have searched online for implementations of an infinite scrolling list in TypeScript (with Angular). My current approach is having a Service, that provides an Object, that can be used to get all the results. Then I have a Component showing them. Alternatives seem to be doing the checking for scroll position right in the service that queries the backend, or having the component fetch a new Observable from the backend service when the user scrolls.
Both solutions would force me to mix code, that is currently neatly separated. I would prefer having the Service return something, that I can just feed into the Component, without the Component having to know about network requests, or the Service having to know about scroll position.
I would suggest that you use look at the mergeScan
operator instead. It seems like it might be a good fit here.
MergeScan
is similar to the expand
operator in that it feed the data from the previous request back in as an accumulator but unlike expand
it doesn't continue running until the end of time.
Basically assuming that you have a function makeRequest(params)
which takes a request and returns an Observable
that eventually resolves to a response and a stream represent scroll events which we will call fetchMore$
, you can create a fetch-on-demand service like so:
// This is abstracted as a simple "fetch" concept but in reality should
// be hooked up to your scroll handler, properly debounced etc.
this.fetchMore$
.mergeScan(
// Make the request
(acc, _) => makeRequest(acc.paging.next ? acc.paging.cursors.after : ''),
{paging: {}}, // Initial request body
1 // Maximum concurrency, i.e. how many requests can be in flight at once
)
.pluck('data')
.subscribe(data => {/*Do something with the data*/});
I set the concurrency to 1 because while you could have multiple requests in flight, there is currently no guarantee of order, so the result could be that the acc gets out of sync if the user is scrolling really fast, whereas with a concurrency of 1 the data will always be ordered.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With