Let's imagine i have a function fetchUser which takes as parameter userId and return an observable of user.
As i am calling this method often, i want to batch the ids to perform one request with multiple ids instead !
Here my troubles began...
I can't find a solution to do that without sharing an observable between the different calls of fetchUser.
import { Subject, from } from "rxjs"
import { bufferTime, mergeMap, map, toArray, filter, take, share } from "rxjs/operators"
const functionThatSimulateAFetch = (userIds: string[]) => from(userIds).pipe(
map((userId) => ({ id: userId, name: "George" })),
toArray(),
)
const userToFetch$ = new Subject<string>()
const fetchedUser$ = userToFetch$.pipe(
bufferTime(1000),
mergeMap((userIds) => functionThatSimulateAFetch(userIds)),
share(),
)
const fetchUser = (userId: string) => {
const observable = fetchedUser$.pipe(
map((users) => users.find((user) => user.id === userId)),
filter((user) => !!user),
take(1),
)
userToFetch$.next(userId)
return observable
}
But that's ugly and it has multiple troubles:
fetchUser before the timer of bufferTime is finished, it doesn't prevent the fetch of the user. fetchUser before the fetch of the batch is finished, it doesn't cancel the request.More generally: i don't know how to solve the problems requiring sharing resources using RxJS. It's difficult to find advanced example of RxJS.
What you have is a good, but as with everything RxJS, but the devil is in the details.
switchMaping mergeMap((userIds) => functionThatSimulateAFetch(userIds)),
This is where you first go wrong. By using a merge map here, you are making it impossible to tell appart the "stream of requests" from the "stream returned by a single request":
Rather, what you want is to emit individual BatchEvents, via a normal map (producing an observable of observable), and switchMap/mergeMap those after the filtering.
userToFetch$.next(userId)
return observable
Don’t do this. An observable by itself does not actually do anything. It’s a "blueprint" for a sequence of actions to happen when you subscribe to it. By doing this, you’ll only create a batch action on observable creating, but you’re screwed if you get multiple or delayed subscriptions.
Rather, you want to create an observable from defer that emits to userToFetch$ on every subscription.
Even then you’ll want to subscribe to your observable before emitting to userToFetch: If you aren’t subscribed, your observable is not listening to the subject, and the event will be lost. You can do this in a defer-like observable.
Short, and not very different from your code, but structure it like this.
const BUFFER_TIME = 1000;
type BatchEvent = { keys: Set<string>, values: Observable<Users> };
/** The incoming keys */
const keySubject = new Subject<string>();
const requests: Observable<{ keys: Set<string>, values: Observable<Users> }> =
this.keySubject.asObservable().pipe(
bufferTime(BUFFER_TIME),
map(keys => this.fetchBatch(keys)),
share(),
);
/** Returns a single User from an ID. Batches the request */
function get(userId: string): Observable<User> {
console.log("Creating observable for:", userId);
// The money observable. See "defer":
// triggers a new subject event on subscription
const observable = new Observable<BatchEvent>(observer => {
this.requests.subscribe(observer);
// Emit *after* the subscription
this.keySubject.next(userId);
});
return observable.pipe(
first(v => v.keys.has(userId)),
// There is only 1 item, so any *Map will do here
switchMap(v => v.values),
map(v => v[userId]),
);
}
function fetchBatch(args: string[]): BatchEvent {
const keys = new Set(args); // Do not batch duplicates
const values = this.userService.get(Array.from(keys)).pipe(
share(),
);
return { keys, values };
}
This does exactly what you were asking, including:
shareReplay or whatever though. So no surprises there.Here is a working stackblitz Angular demo: https://stackblitz.com/edit/angular-rxjs-batch-request
In particular, notice the behavior when you "toggle" the display: You’ll notice that re-subscribing to existing observables will fire new batch requests, and that those requests will cancel (or outright not fire) if you re-toggle fast enough.
In our project, we use this for Angular Tables, where each row needs to individually fetch additional data to render. This allows us to:
I would not add chunking or rate limitting into this. Because the source observable is a dumb bufferTime you run into issues:
This is a pessimistic point of view though. Fixing it would mean going full out with a stateful queue/batch mechanism, which is an order of magnitude more complex.
I think @Biggy is right.
This is the way I understand the problem and what you want to achieve
nullIf this is all true, then you probably have to have some sort of queuing mechanism, as Buggy suggested.
Then there may be many implementations of such mechanism.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With