Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

rxjs5/Angular - Clear ReplaySubject buffer

I have an Angular service that shares a data stream (http call) to several components asynchronously. I need to recall the http service based on user action once in a while.

I am using a ReplaySubject to save loaded values and send to subscribers who subscribe after the http call.

I would like to know if there is a way to clear the ReplaySubject's buffer before making subsequent http calls? While at it, I suspect that I need to also unsubscribe in order to not create a leak?

Service:

@Injectable()
export class GreatDataService {

    public data$: ReplaySubject<any>;
    private subs: Subscription;

    constructor(private http: Http) {
        this.data$ = new ReplaySubject(1);
    }

    public refresh() {
        if (this.subs) {
            this.subs.unsubscribe();
            this.subs = null;
        }
        this.subs = this.http.get('/api').subscribe(this.data$)
    }
}

Top level section component:

...
    constructor(private greatDataService: GreatDataService) {}
    ngOnInit() {
         this.greatDataService.refresh();
    }
...

Component 1:

...
    constructor(private greatDataService: GreatDataService) {}
    ngOnInit() {
    this.greatDataService.data$.subscribe(
        x => console.log('subscriber 1: ' + x),
        err => console.log('subscriber 1: ' + err),
        () => console.log('subscriber 1: Completed')
    );
...

Component 2:

...
    constructor(private greatDataService: GreatDataService) {}
    ngOnInit() {
    this.greatDataService.data$.subscribe(
        x => console.log('subscriber 2: ' + x),
        err => console.log('subscriber 2: ' + err),
        () => console.log('subscriber 2: Completed')
    );
...
like image 246
Thibs Avatar asked Jan 25 '17 18:01

Thibs


2 Answers

You can use a Rx.Subject for emitting new 'get-fresh-data' events so you can retrieve fresh data when refresh() is invoked using a .switchMap(). See this example how to do this:

function getData() {
  return Rx.Observable.of('retrieving new data')
    .timestamp()
    .delay(500);
}

// in this example i use an eventStream of clicks
// you can use Rx.Subject() and manually .next() a new value
// when somebody invokes .refresh()
const refreshDataClickStream = Rx.Observable.fromEvent(document.getElementById('refresh_stream'), 'click');

const dataStream = refreshDataClickStream
  .startWith('PAGE_LOAD') /* let the stream always first time fetching data */
  .switchMap(() => getData()) /* getData() is not cached so we switchMap to a new instance, abandoning the previous result*/
  .publishReplay().refCount(); /* refCounter so everybody gets the same results */

dataStream.subscribe(data => console.log('sub1 data: ' + JSON.stringify(data)));

setTimeout(() => {
  console.log('late arriving subscription (gets same stream)');
  dataStream.subscribe(data => console.log('sub2 data: ' + JSON.stringify(data)));
}, 2000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>
<input type='button' id='refresh_stream' value="refresh_stream" />
like image 198
Mark van Straten Avatar answered Nov 07 '22 13:11

Mark van Straten


The problem becomes easier if you can use the fact that the buffer consumes data from the original source, and that subscribers to buffered data can switch to the original source after receiving all the old values.

Eg.

let data$ = new Subject<any>() // Data source

let buffer$ = new ReplaySubject<any>() 
let bs = data$.subscribe(buffer$)  // Buffer subscribes to data

let getRepeater = () => {
   return concat(buffer$.pipe(
      takeUntil(data$), // Switch from buffer to original source when data comes in
    ), data$)
}

To clear, replace the buffer

// Begin Buffer Clear Sequence
bs.unsubscribe()
buffer$.complete()

buffer$ = new ReplaySubject()
bs = data$.subscribe(buffer$)
buffObs.next(buffer$)

To make the code more functional, you can replace the function getRepeater() with a subject that reflects the latest reference

let buffObs = new ReplaySubject<ReplaySubject<any>>(1)
buffObs.next(buffer$)        

let repeater$ = concat(buffObs.pipe(
   takeUntil(data$),
   switchMap((e) => e),                    
), data$)

The following

    let data$ = new Subject<any>()

    let buffer$ = new ReplaySubject<any>()
    let bs = data$.subscribe(buffer$)         

    let buffObs = new ReplaySubject<ReplaySubject<any>>(1)
    buffObs.next(buffer$)        

    let repeater$ = concat(buffObs.pipe(
      takeUntil(data$),
      switchMap((e) => e),                    
    ), data$)

    // Begin Test

    data$.next(1)
    data$.next(2)
    data$.next(3)

    console.log('rep1 sub')
    let r1 = repeater$.subscribe((e) => {          
      console.log('rep1 ' + e)
    })

    // Begin Buffer Clear Sequence
    bs.unsubscribe()
    buffer$.complete()

    buffer$ = new ReplaySubject()
    bs = data$.subscribe(buffer$)
    buffObs.next(buffer$)
    // End Buffer Clear Sequence

    console.log('rep2 sub')
    let r2 = repeater$.subscribe((e) => {
      console.log('rep2 ' + e)
    })

    data$.next(4)
    data$.next(5)
    data$.next(6)

    r1.unsubscribe()
    r2.unsubscribe()

    data$.next(7)
    data$.next(8)
    data$.next(9)        

    console.log('rep3 sub')
    let r3 = repeater$.subscribe((e) => {
      console.log('rep3 ' + e)
    })

Outputs

rep1 sub

rep1 1

rep1 2

rep1 3

rep2 sub

rep1 4

rep2 4

rep1 5

rep2 5

rep1 6

rep2 6

rep3 sub

rep3 4

rep3 5

rep3 6

rep3 7

rep3 8

rep3 9

like image 43
Kin Avatar answered Nov 07 '22 11:11

Kin