Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I make an rxjs Subject repeat its last emission periodically?

I'm trying to create an rxjs Subject that will repeat its output after a period of inactivity. My initial design was to make use of debounceTime, however that doesn't appear to trigger more than once.

I would like the subject to emit immediately when next is called, and repeat that emission periodically until a new value is provided:

Inputs:  ---a---------b---------c----
Outputs: ---a---a---a-b---b---b-c---c

Currently I have something like so:

const subject = new rx.Subject()
subject.debounceTime(5000)
       .subscribe(subject)

subject.subscribe(value => console.log(`emitted: ${value}`))
subject.take(1).subscribe(next => next, error => error, () => {
    console.log('emitted once')
})
subject.take(2).subscribe(next => next, error => error, () => {
    console.log('emitted twice')
})
subject.take(3).subscribe(next => next, error => error, () => {
    console.log('emitted thrice')
})

subject.next('a')

However this will only emit 'a' once, and the output 'emitted thrice' is never seen.

Could somebody please help me understand what's going wrong here?

like image 737
Micheal Hill Avatar asked Apr 18 '17 02:04

Micheal Hill


2 Answers

I think you can use the repeatWhen() operator if I understand your problem correctly:

const subject = new ReplaySubject(1);
subject.next('a');

subject
  .take(1)
  .repeatWhen(() => Observable.timer(500, 500))
  .subscribe(val => console.log(val));

setTimeout(() => subject.next('b'), 1400);

See live demo: https://jsbin.com/nufasiq/2/edit?js,console

This prints to console the following output in 500ms intervals:

a
a
a
b
b
b

The take(1) is necessary here to make the chain complete properly which is intercepted by repeatWhen() that subscribes again to its source Observable.

like image 132
martin Avatar answered Nov 17 '22 13:11

martin


Another option is to use switchMap and interval:

const source = new Rx.Subject();

source
  .switchMap((val) => Rx.Observable.interval(5000).map(() => val))
  .subscribe(val => console.log(val));


Rx.Observable.interval(11000).subscribe(x => source.next(x));

see a demo in jsbin

like image 1
Ofer Herman Avatar answered Nov 17 '22 13:11

Ofer Herman