Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Resetting ReplaySubject in RxJS 6

I have a filterable 'activity log' that's currently implemented using a ReplaySubject (since a few components use it and they might subscribe at different times).

When the user changes the filter settings, a new request is made, however the results are appended to the ReplaySubject rather than replacing it.

I was wondering if there is anyway to update the ReplaySubject to only send through the new items using something like a switchMap?

Otherwise, I might need to either use a BehaviorSubject that returns an array of all the activity entries or recreate the ReplaySubject and notify users (probably by using another observable) to unsubscribe and resubscribe to the new observable.

like image 748
NRaf Avatar asked Jul 03 '18 01:07

NRaf


People also ask

What is correct answer about ReplaySubject and BehaviorSubject in RXJS?

The ReplaySubject is comparable to the BehaviorSubject in the way that it can send “old” values to new subscribers. It however has the extra characteristic that it can record a part of the observable execution and therefore store multiple old values and “replay” them to new subscribers.

What is ReplaySubject in RXJS?

ReplaySubject is a variant of a Subject which keeps a cache of previous values emitted by a source observable and sends them to all new observers immediately on subscription. This behavior of replaying a sequence of old values to new subscribes is where the name for this type of a subject comes from.

When should I use ReplaySubject?

If you want to provide an initial value at subscription time, even if nothing has been pushed to a Subject so far, use the BehaviorSubject. If you want to have the last value replayed to an observer, even if a Subject is already closed, use the ReplaySubject(1).

How do you make a BehaviorSubject null?

You need to explicitly state that the actionsSubject may receive Action | null values, namely: private actionsSubject = new BehaviorSubject<Action | null>(null); As an alternative, you can switch from BehaviorSubject to ReplaySubject(1).


2 Answers

If you want to be able to reset a subject without having its subscribers explicitly unsubscribe and resubscribe, you could do something like this:

import { Observable, Subject } from "rxjs";
import { startWith, switchMap } from "rxjs/operators";

function resettable<T>(factory: () => Subject<T>): {
  observable: Observable<T>,
  reset(): void,
  subject: Subject<T>
} {
  const resetter = new Subject<any>();
  const source = new Subject<T>();
  let destination = factory();
  let subscription = source.subscribe(destination);
  return {
    observable: resetter.asObservable().pipe(
      startWith(null),
      switchMap(() => destination)
    ),
    reset: () => {
      subscription.unsubscribe();
      destination = factory();
      subscription = source.subscribe(destination);
      resetter.next();
    },
    subject: source
  };
}

resettable will return an object containing:

  • an observable to which subscribers to the re-settable subject should subscribe;
  • a subject upon which you'd call next, error or complete; and
  • a reset function that will reset the (inner) subject.

You'd use it like this:

import { ReplaySubject } from "rxjs";
const { observable, reset, subject } = resettable(() => new ReplaySubject(3));
observable.subscribe(value => console.log(`a${value}`)); // a1, a2, a3, a4, a5, a6
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
observable.subscribe(value => console.log(`b${value}`)); // b2, b3, b4, b5, b6
reset();
observable.subscribe(value => console.log(`c${value}`)); // c5, c6
subject.next(5);
subject.next(6);
like image 80
cartant Avatar answered Nov 01 '22 11:11

cartant


Here is a class that is using the resettable factory posted here before, so you can use const myReplaySubject = new ResettableReplaySubject<myType>()

import { ReplaySubject, Subject, Observable, SchedulerLike } from "rxjs";
import { startWith, switchMap } from "rxjs/operators";

export class ResettableReplaySubject<T> extends ReplaySubject<T> {

reset: () => void;

constructor(bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike) {
    super(bufferSize, windowTime, scheduler);
    const resetable = this.resettable(() => new ReplaySubject<T>(bufferSize, windowTime, scheduler));

    Object.keys(resetable.subject).forEach(key => {
        this[key] = resetable.subject[key];
    })

    Object.keys(resetable.observable).forEach(key => {
        this[key] = resetable.observable[key];
    })

    this.reset = resetable.reset;
}


private resettable<T>(factory: () => Subject<T>): {
    observable: Observable<T>,
    reset(): void,
    subject: Subject<T>,
} {
    const resetter = new Subject<any>();
    const source = new Subject<T>();
    let destination = factory();
    let subscription = source.subscribe(destination);
    return {
        observable: resetter.asObservable().pipe(
            startWith(null),
            switchMap(() => destination)
        ) as Observable<T>,
        reset: () => {
            subscription.unsubscribe();
            destination = factory();
            subscription = source.subscribe(destination);
            resetter.next();
        },
        subject: source,
    };
}
}
like image 20
Erez Shlomo Avatar answered Nov 01 '22 12:11

Erez Shlomo