Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use Firestore realtime updates (onSnapshot) with redux-observable/rxjs?

I am able to setup redux-observable with normal Firestore queries

export const getStatementsEpic = (action$, store) => {
  return action$.ofType(GET_STATEMENTS)
    .filter(() => {
      const state = store.getState()
      return state.auth.user
    })
    .mergeMap(() => {
      console.log('action', store.getState().auth.user.uid)
      const db = firebase.firestore()
      db.settings({ timestampsInSnapshots: true })
      const query = db.collection('users')
        .doc(store.getState().auth.user.uid)
        .collection('statements')
        .orderBy('uploadedOn', 'desc')
        .limit(50)
      return query.get().then(snapshot => {
        console.log('Should have gotten snapshot')
        return getStatementsSnapshot(snapshot)
      })
    })
}

But I want to convert this to be realtime, I tried changing

return query.get().then(snapshot => {

to

return query.onSnapshot(snapshot => {

But it does not work ... I guess this is not a promise? How do I resolve this?

like image 503
Jiew Meng Avatar asked Oct 16 '22 17:10

Jiew Meng


1 Answers

You're correct, the onSnapshot method does not return a promise. Instead, it returns a function that can be used to unsubscribe from change notifications. Until that unsubscribe function is invoked, the callback passed to the onSnapshot method will be called every time the document changes. (The documentation indicates the callback will also be called immediately with the current document contents.)

Functions like onSnapshot that use a callback function multiple times can be "converted" into an observable using the fromEventPattern function. fromEventPattern takes two functions as parameters. The first function you pass needs to call onSnapshot, passing it a RxJS-defined handler as the callback. The second function you pass needs to call the unsubscribe function returned by onSnapshot. RxJS will call the first function when you subscribe to the observable (i.e. use it in your epic). RxJS will call the second function when you unsubscribe from the observable.

Here's an example of your code updated to use fromEventPattern and the new RxJS pipes:

export const getStatementsEpic = (action$, state$) => action$.pipe(
  ofType(GET_STATEMENTS),
  withLatestFrom(state$),
  filter(([action, state]) => state.auth.user),
  mergeMap(([action, state]) => {
    const db = firebase.firestore()
    db.settings({ timestampsInSnapshots: true })
    return fromEventPattern(
      handler => db.collection('users')
        .doc(state.auth.user.uid)
        .collection('statements')
        .orderBy('uploadedOn', 'desc')
        .limit(50)
        .onSnapshot(handler),
      (handler, unsubscribe) => unsubscribe(),
    ).pipe(
      map(getStatementsSnapshot),
      takeUntil(action$.pipe(
        ofType(STOP_GET_STATEMENTS),
      )),
    )
  }),
)

Take note that I've introduced takeUntil to the snapshot stream. Without it (or something like it), the snapshot stream will never end. Another possible change is using switchMap instead of mergeMap. How to unsubscribe just depends on your use case.

like image 149
seniorquico Avatar answered Nov 15 '22 09:11

seniorquico