Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Do side effect if observable has not emitted a value within X amount of time

Tags:

rxjs

I'm working on a use case that requires that if an observable has not emitted a value within a certain amount of time then we should do some side effect.

To give a practical use case:

  • open web socket connection
  • if no message has been sent/received within X time then close web socket connection and notify user

This requires for a timer to be initiated on every emitted value and upon initial subscription of observable which will then run some function after the allotted time or until a value is emitted in which the timer resets. I'm struggling to do this the Rx way. Any help would be appreciated :)

like image 496
Sam Elliott De La Torre Babá Avatar asked Oct 25 '17 19:10

Sam Elliott De La Torre Babá


1 Answers

debounceTime is the operator you're looking for: it only emits a value if no others follow within a specific timeout. Listening for the first message of the debounced stream will let you time out and clean up your websocket connection. If you need to time out starting from the opening of the stream, you can simply startWith. Concretely:

messages$.startWith(null)
         .debounceTime(timeout)
         .take(1)
         .subscribe(() => { /* side effects */ });

Edit: if instead you're looking to end the a message stream entirely when it times out (e.g. you clean up in the onComplete handler), just cram debounceTime into a takeUntil:

messages$.takeUntil(
  messages$.startWith(null)
           .debounceTime(timeout)
).subscribe(timeout_observer);

With a timeout_observable: Observer<TMessage> that contains your cleanup onComplete.

like image 67
concat Avatar answered Sep 19 '22 23:09

concat