I'm trying to create an rxjs Subject that will repeat its output after a period of inactivity. My initial design was to make use of debounceTime, however that doesn't appear to trigger more than once.
I would like the subject to emit immediately when next
is called, and repeat that emission periodically until a new value is provided:
Inputs: ---a---------b---------c----
Outputs: ---a---a---a-b---b---b-c---c
Currently I have something like so:
const subject = new rx.Subject()
subject.debounceTime(5000)
.subscribe(subject)
subject.subscribe(value => console.log(`emitted: ${value}`))
subject.take(1).subscribe(next => next, error => error, () => {
console.log('emitted once')
})
subject.take(2).subscribe(next => next, error => error, () => {
console.log('emitted twice')
})
subject.take(3).subscribe(next => next, error => error, () => {
console.log('emitted thrice')
})
subject.next('a')
However this will only emit 'a' once, and the output 'emitted thrice' is never seen.
Could somebody please help me understand what's going wrong here?
I think you can use the repeatWhen()
operator if I understand your problem correctly:
const subject = new ReplaySubject(1);
subject.next('a');
subject
.take(1)
.repeatWhen(() => Observable.timer(500, 500))
.subscribe(val => console.log(val));
setTimeout(() => subject.next('b'), 1400);
See live demo: https://jsbin.com/nufasiq/2/edit?js,console
This prints to console the following output in 500ms intervals:
a
a
a
b
b
b
The take(1)
is necessary here to make the chain complete properly which is intercepted by repeatWhen()
that subscribes again to its source Observable.
Another option is to use switchMap
and interval
:
const source = new Rx.Subject();
source
.switchMap((val) => Rx.Observable.interval(5000).map(() => val))
.subscribe(val => console.log(val));
Rx.Observable.interval(11000).subscribe(x => source.next(x));
see a demo in jsbin
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With