Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Adding an observable sequence after subscription

We are using Rx to monitor activity within our silverlight application so that we can display a message to the user after a period of inactivity.

We are turning events (mouse moves etc.) into observables and then merging the observables together to create a single (allActivity) observable. We then throttle the allActivity observable using a timespan and something subscribes to be notified when the system has been inactive for a period of time.

How can I add a new observable/ sequence to this after the subscription (so that the subscription picks this up without unsubscribing and resubscribing).

e.g. merge several sequences together, throttle, subscribe. Now add an additional sequence to the observable that has been subscribed to.

Example code:

private IObservable<DateTime> allActivity;
public void CreateActivityObservables(UIElement uiElement)
{
    // Create IObservables of event types we are interested in and project them as DateTimes
    // These are our observables sequences that can push data to subscribers/ observers 
    // NB: These are like IQueryables in the sense that they do not iterate over the sequence just provide an IObservable type
    var mouseMoveActivity = Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(h => uiElement.MouseMove += h, h => uiElement.MouseMove -= h)
                                      .Select(o => DateTime.Now);

    var mouseLeftButtonActivity = Observable.FromEventPattern<MouseButtonEventHandler, MouseButtonEventArgs>(h => uiElement.MouseLeftButtonDown += h, h => uiElement.MouseLeftButtonDown -= h)
                                            .Select(o => DateTime.Now);

    var mouseRightButtonActivity = Observable.FromEventPattern<MouseButtonEventHandler, MouseButtonEventArgs>(h => uiElement.MouseRightButtonDown += h, h => uiElement.MouseRightButtonDown -= h)
                                             .Select(o => DateTime.Now);

    var mouseWheelActivity = Observable.FromEventPattern<MouseWheelEventHandler, MouseWheelEventArgs>(h => uiElement.MouseWheel += h, h => uiElement.MouseWheel -= h)
                                       .Select(o => DateTime.Now);

    var keyboardActivity = Observable.FromEventPattern<KeyEventHandler, KeyEventArgs>(h => uiElement.KeyDown += h, h => uiElement.KeyDown -= h)
                                     .Select(o => DateTime.Now);

    var streetViewContainer = HtmlPage.Document.GetElementById("streetViewContainer");
        var mouseMoveHandler = new EventHandler<HtmlEventArgs>(this.Moo);
        bool b = streetViewContainer.AttachEvent("mousemove", mouseMoveHandler);

    var browserActivity = Observable.FromEventPattern<Landmark.QDesk.ApplicationServices.IdleTimeoutService.MouseMoveHandler, HtmlEventArgs>(h => this.MyMouseMove += h, h => this.MyMouseMove -= h).Select(o => DateTime.Now);

    // Merge the IObservables<DateTime> together into one stream/ sequence
    this.allActivity = mouseMoveActivity.Merge(mouseLeftButtonActivity)
                                        .Merge(mouseRightButtonActivity)
                                        .Merge(mouseWheelActivity)
                                        .Merge(keyboardActivity)
                                        .Merge(browserActivity);
}

public IDisposable Subscribe(TimeSpan timeSpan, Action<DateTime> timeoutAction)
{
    IObservable<DateTime> timeoutNotification = this.allActivity.Merge   (IdleTimeoutService.GetDateTimeNowObservable())
                                                                .Throttle(timeSpan)
                                                                    .ObserveOn(Scheduler.ThreadPool);

    return timeoutNotification.Subscribe(timeoutAction);
}
like image 327
user1040208 Avatar asked Nov 10 '11 17:11

user1040208


2 Answers

There's an overload to Merge that takes in an IObservable<IObservable<TSource>>. Make the outer sequence a Subject<IObservable<TSource>> and call OnNext to it when you want to add another source to the bunch. The Merge operator will receive the source and subscribe to it:

var xss = new Subject<IObservable<int>>();
xss.Merge().Subscribe(x => Console.WriteLine(x));

xss.OnNext(Observable.Interval(TimeSpan.FromSeconds(1.0)).Select(x => 23 + 8 * (int)x));
xss.OnNext(Observable.Interval(TimeSpan.FromSeconds(0.8)).Select(x => 17 + 3 * (int)x));
xss.OnNext(Observable.Interval(TimeSpan.FromSeconds(1.3)).Select(x => 31 + 2 * (int)x));
...
like image 181
Bart De Smet Avatar answered Oct 18 '22 17:10

Bart De Smet


The easiest way to do this would be to use an intermediate subject in place of the Merge calls.

Subject<DateTime> allActivities = new Subject<DateTime>();
var activitySubscriptions = new CompositeDisposable();

activitySubscriptions.Add(mouseMoveActivity.Subscribe(allActivities));
activitySubscriptions.Add(mouseLeftButtonActivity.Subscribe(allActivities));
//etc ...

//subscribe to activities
allActivities.Throttle(timeSpan)
             .Subscribe(timeoutAction);

//later add another
activitySubscriptions.Add(newActivity.Subscribe(allActivities));

The Subject class will stop passing OnNext (and further OnError and OnCompleted) events from any of the observables it is subscribed to if it receives any OnError or OnCompleted.

The main difference between this approach and your sample is that it subscribes to all the events when the subject is created, rather than when you subscribe to the merged observable. Since all of the observables in your example are hot, the difference should not be noticeable.

like image 36
Gideon Engelberth Avatar answered Oct 18 '22 17:10

Gideon Engelberth