I have a filterable 'activity log' that's currently implemented using a ReplaySubject
(since a few components use it and they might subscribe at different times).
When the user changes the filter settings, a new request is made, however the results are appended to the ReplaySubject
rather than replacing it.
I was wondering if there is anyway to update the ReplaySubject
to only send through the new items using something like a switchMap
?
Otherwise, I might need to either use a BehaviorSubject
that returns an array of all the activity entries or recreate the ReplaySubject
and notify users (probably by using another observable) to unsubscribe and resubscribe to the new observable.
The ReplaySubject is comparable to the BehaviorSubject in the way that it can send “old” values to new subscribers. It however has the extra characteristic that it can record a part of the observable execution and therefore store multiple old values and “replay” them to new subscribers.
ReplaySubject is a variant of a Subject which keeps a cache of previous values emitted by a source observable and sends them to all new observers immediately on subscription. This behavior of replaying a sequence of old values to new subscribes is where the name for this type of a subject comes from.
If you want to provide an initial value at subscription time, even if nothing has been pushed to a Subject so far, use the BehaviorSubject. If you want to have the last value replayed to an observer, even if a Subject is already closed, use the ReplaySubject(1).
You need to explicitly state that the actionsSubject may receive Action | null values, namely: private actionsSubject = new BehaviorSubject<Action | null>(null); As an alternative, you can switch from BehaviorSubject to ReplaySubject(1).
If you want to be able to reset a subject without having its subscribers explicitly unsubscribe and resubscribe, you could do something like this:
import { Observable, Subject } from "rxjs";
import { startWith, switchMap } from "rxjs/operators";
function resettable<T>(factory: () => Subject<T>): {
observable: Observable<T>,
reset(): void,
subject: Subject<T>
} {
const resetter = new Subject<any>();
const source = new Subject<T>();
let destination = factory();
let subscription = source.subscribe(destination);
return {
observable: resetter.asObservable().pipe(
startWith(null),
switchMap(() => destination)
),
reset: () => {
subscription.unsubscribe();
destination = factory();
subscription = source.subscribe(destination);
resetter.next();
},
subject: source
};
}
resettable
will return an object containing:
observable
to which subscribers to the re-settable subject should subscribe;subject
upon which you'd call next
, error
or complete
; andreset
function that will reset the (inner) subject.You'd use it like this:
import { ReplaySubject } from "rxjs";
const { observable, reset, subject } = resettable(() => new ReplaySubject(3));
observable.subscribe(value => console.log(`a${value}`)); // a1, a2, a3, a4, a5, a6
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
observable.subscribe(value => console.log(`b${value}`)); // b2, b3, b4, b5, b6
reset();
observable.subscribe(value => console.log(`c${value}`)); // c5, c6
subject.next(5);
subject.next(6);
Here is a class that is using the resettable factory posted here before, so you can use
const myReplaySubject = new ResettableReplaySubject<myType>()
import { ReplaySubject, Subject, Observable, SchedulerLike } from "rxjs";
import { startWith, switchMap } from "rxjs/operators";
export class ResettableReplaySubject<T> extends ReplaySubject<T> {
reset: () => void;
constructor(bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike) {
super(bufferSize, windowTime, scheduler);
const resetable = this.resettable(() => new ReplaySubject<T>(bufferSize, windowTime, scheduler));
Object.keys(resetable.subject).forEach(key => {
this[key] = resetable.subject[key];
})
Object.keys(resetable.observable).forEach(key => {
this[key] = resetable.observable[key];
})
this.reset = resetable.reset;
}
private resettable<T>(factory: () => Subject<T>): {
observable: Observable<T>,
reset(): void,
subject: Subject<T>,
} {
const resetter = new Subject<any>();
const source = new Subject<T>();
let destination = factory();
let subscription = source.subscribe(destination);
return {
observable: resetter.asObservable().pipe(
startWith(null),
switchMap(() => destination)
) as Observable<T>,
reset: () => {
subscription.unsubscribe();
destination = factory();
subscription = source.subscribe(destination);
resetter.next();
},
subject: source,
};
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With