Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rx Framework: execute an action on timeout without interrupting the original observable sequence

Given an observable source, generated by polling the (changes of a) state of a low-level device...

// observable source metacode:
IObservable<DeviceState> source = Observable.Interval(TimeSpan.FromSeconds(0.5))
    .Select(tick => new DeviceState(_device.ReadValue()))
    .DistinctUntilChanged();

... and a consumer that updates the UI...

// UI metacode:
service.GetObservableDeviceStates()
    .Subscribe(state => viewModel.CurrentState = state.ToString());

... I need to execute a custom action after x seconds of source's "inactivity", without interrupting the subscription to source. Something like this:

// UI metacode:
service.GetObservableDeviceStates()
    .DoOnTimeout(TimeSpan.FromSeconds(x), () => viewModel.CurrentState = "Idle")
    .Subscribe(state => viewModel.CurrentState = state.ToString());

What are the best practices? Possible solutions that come to mind are (I'm a Rx noob):

  1. Buffer (even if it's not so readable)
  2. Playing around this Timeout overload;
  3. Returning something special "service-side" when nothing changes (instead of using DistinctUntilChanged) and dealing with it on the UI code:

    service.GetObservableDeviceStates() .Subscribe(state => viewModel.CurrentState = state.Special ? "Idle" : state.ToString());

EDIT: as reported in the answer, the solution is:

        service.GetObservableDeviceStates()
            .Do(onNext)
            .Throttle(TimeSpan.FromSeconds(x))
            .Subscribe(onTimeout);

EDIT2 (Warning)

If onNext and onTimeout updates UI components, to avoid CrossThreadExceptions two ObserveOn(uiSynchronizationContext) are needed, since Throttle works on another thread!

        service.GetObservableDeviceStates()
            .ObserveOn(uiSynchronizationContext)
            .Do(onNext)
            .Throttle(TimeSpan.FromSeconds(x))
            .ObserveOn(uiSynchronizationContext)
            .Subscribe(onTimeout);
like image 604
Notoriousxl Avatar asked Oct 08 '12 17:10

Notoriousxl


1 Answers

Timeout is more or less meant for observables which represent single asynchronous operations - for e.g., to return a default value or OnError if said observable hasn't notified you in a certain amount of time.

The operator you're looking for is Throttle, even though it may not seem like it at first. Throttle(p) gives you a stream which produces a value when the source stream has not produced a value for period p.

Parallel to your existing code, you can use source.Throttle(period).Do(...side effect).

like image 169
Asti Avatar answered Nov 07 '22 15:11

Asti