Given an observable source, generated by polling the (changes of a) state of a low-level device...
// observable source metacode:
IObservable<DeviceState> source = Observable.Interval(TimeSpan.FromSeconds(0.5))
.Select(tick => new DeviceState(_device.ReadValue()))
.DistinctUntilChanged();
... and a consumer that updates the UI...
// UI metacode:
service.GetObservableDeviceStates()
.Subscribe(state => viewModel.CurrentState = state.ToString());
... I need to execute a custom action after x seconds of source's "inactivity", without interrupting the subscription to source. Something like this:
// UI metacode:
service.GetObservableDeviceStates()
.DoOnTimeout(TimeSpan.FromSeconds(x), () => viewModel.CurrentState = "Idle")
.Subscribe(state => viewModel.CurrentState = state.ToString());
What are the best practices? Possible solutions that come to mind are (I'm a Rx noob):
Returning something special "service-side" when nothing changes (instead of using DistinctUntilChanged) and dealing with it on the UI code:
service.GetObservableDeviceStates() .Subscribe(state => viewModel.CurrentState = state.Special ? "Idle" : state.ToString());
EDIT: as reported in the answer, the solution is:
service.GetObservableDeviceStates()
.Do(onNext)
.Throttle(TimeSpan.FromSeconds(x))
.Subscribe(onTimeout);
EDIT2 (Warning)
If onNext and onTimeout updates UI components, to avoid CrossThreadExceptions two ObserveOn(uiSynchronizationContext) are needed, since Throttle works on another thread!
service.GetObservableDeviceStates()
.ObserveOn(uiSynchronizationContext)
.Do(onNext)
.Throttle(TimeSpan.FromSeconds(x))
.ObserveOn(uiSynchronizationContext)
.Subscribe(onTimeout);
Timeout is more or less meant for observables which represent single asynchronous operations - for e.g., to return a default value or OnError
if said observable hasn't notified you in a certain amount of time.
The operator you're looking for is Throttle, even though it may not seem like it at first. Throttle(p)
gives you a stream which produces a value when the source stream has not produced a value for period p
.
Parallel to your existing code, you can use source.Throttle(period).Do(...side effect)
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With