I'm working on a use case that requires that if an observable has not emitted a value within a certain amount of time then we should do some side effect.
To give a practical use case:
This requires for a timer to be initiated on every emitted value and upon initial subscription of observable which will then run some function after the allotted time or until a value is emitted in which the timer resets. I'm struggling to do this the Rx way. Any help would be appreciated :)
debounceTime
is the operator you're looking for: it only emits a value if no others follow within a specific timeout. Listening for the first message of the debounce
d stream will let you time out and clean up your websocket connection. If you need to time out starting from the opening of the stream, you can simply startWith
. Concretely:
messages$.startWith(null)
.debounceTime(timeout)
.take(1)
.subscribe(() => { /* side effects */ });
Edit: if instead you're looking to end the a message stream entirely when it times out (e.g. you clean up in the onComplete handler), just cram debounceTime
into a takeUntil
:
messages$.takeUntil(
messages$.startWith(null)
.debounceTime(timeout)
).subscribe(timeout_observer);
With a timeout_observable: Observer<TMessage>
that contains your cleanup onComplete.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With