Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using IObservable (Rx) as a INotifyCollectionChanged Replacement for MVVM?

I have been looking into using Rx in an MVVM framework. The idea is to use 'live' LINQ queries over in-memory datasets to project data into View Models to bind with.

Previously this has been possible with the use of INotifyPropertyChanged/INotifyCollectionChanged and an open source library called CLINQ. The potential with Rx and IObservable is to move to a much more declarative ViewModel using Subject classes to propagate changed events from the source model through to the View. A conversion from IObservable to the regular databinding interfaces would be needed for the last step.

The problem is that Rx doesn't seem to support the notification that an entity has been removed from the stream. Example below.
The code shows a POCO which uses BehaviorSubject class for the field state. The code goes onto to create a collection of these entities and use Concat to merge the filter streams together. This means that any changes to the POCOs are reported to a single stream.

A filter for this stream is setup to filter for Rating==0. The subscription simply outputs the result to the debug window when an even occurs.

Settings Rating=0 on any element will trigger the event. But setting Rating back to 5 will not see any events.

In the case of CLINQ the output of the query will support INotifyCollectionChanged - so that items added and removed from the query result will fire the correct event to indicate the query result has changed (an item added or removed).

The only way I can think of address this is to set-up two streams with oppossite (dual) queries. An item added to the opposite stream implies removal from the resultset. Failing that, I could just use FromEvent and not make any of the entity models observable - which makes Rx more of just an Event Aggregator. Any pointers?

using System;
using System.ComponentModel;
using System.Linq;
using System.Collections.Generic;

namespace RxTest
{

    public class TestEntity : Subject<TestEntity>, INotifyPropertyChanged
    {
        public IObservable<string> FileObservable { get; set; }
        public IObservable<int> RatingObservable { get; set; }

        public string File
        {
            get { return FileObservable.First(); }
            set { (FileObservable as IObserver<string>).OnNext(value); }
        }

        public int Rating
        {
            get { return RatingObservable.First(); }
            set { (RatingObservable as IObserver<int>).OnNext(value); }
        }

        public event PropertyChangedEventHandler PropertyChanged;

        public TestEntity()
        {
            this.FileObservable = new BehaviorSubject<string>(string.Empty);
            this.RatingObservable = new BehaviorSubject<int>(0);
            this.FileObservable.Subscribe(f => { OnNotifyPropertyChanged("File"); });
            this.RatingObservable.Subscribe(f => { OnNotifyPropertyChanged("Rating"); });
        }

        private void OnNotifyPropertyChanged(string property)
        {
            if (PropertyChanged != null) PropertyChanged(this, new PropertyChangedEventArgs(property));
            // update the class Observable
            OnNext(this);
        }

    }

    public class TestModel
    {
        private List<TestEntity> collection { get; set; }
        private IDisposable sub;

        public TestModel()
        {
            this.collection = new List<TestEntity>() {
            new TestEntity() { File = "MySong.mp3", Rating = 5 },
            new TestEntity() { File = "Heart.mp3", Rating = 5 },
            new TestEntity() { File = "KarmaPolice.mp3", Rating = 5 }};

            var observableCollection = Observable.Concat<TestEntity>(this.collection.Cast<IObservable<TestEntity>>());
            var filteredCollection = from entity in observableCollection
                                     where entity.Rating==0
                                     select entity;
            this.sub = filteredCollection.Subscribe(entity =>
                {
                    System.Diagnostics.Debug.WriteLine("Added :" + entity.File);
                }
            );
            this.collection[0].Rating = 0;
            this.collection[0].Rating = 5;
        }
    };
}
like image 278
Joe Wood Avatar asked Jan 18 '11 14:01

Joe Wood


3 Answers

Actually I found the Reactive-UI library helpful for this (available in NuGet). This library includes special IObservable subjects for collections and the facility to create one of these 'ReactiveCollections' over a a traditional INCC collection. Through this I have streams for new, removed items and changing items in the collection. I then use a Zip to merge the streams together and modify a target ViewModel observable collection. This provides a live projection based on a query on the source model.

The following code solved the problem (this code would be even simpler, but there are some problems with the Silverlight version of Reactive-UI that needed workarounds). The code fires collection changed events by simply adjusting the value of 'Rating' on one of the collection elements:

using System;
using System.ComponentModel;
using System.Linq;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Collections.Specialized;
using ReactiveUI;

namespace RxTest
{

    public class TestEntity :  ReactiveObject, INotifyPropertyChanged, INotifyPropertyChanging
    {
        public string _File;
        public int _Rating = 0;
        public string File
        {
            get { return _File; }
            set { this.RaiseAndSetIfChanged(x => x.File, value); }
        }

        public int Rating
        {
            get { return this._Rating; }
            set { this.RaiseAndSetIfChanged(x => x.Rating, value); }
        }

        public TestEntity()
        {
        }
    }

    public class TestModel
    {
        private IEnumerable<TestEntity> collection { get; set; }
        private IDisposable sub;

        public TestModel()
        {
            this.collection = new ObservableCollection<TestEntity>() {
            new TestEntity() { File = "MySong.mp3", Rating = 5 },
            new TestEntity() { File = "Heart.mp3", Rating = 5 },
            new TestEntity() { File = "KarmaPolice.mp3", Rating = 5 }};

            var filter = new Func<int, bool>( Rating => (Rating == 0));

            var target = new ObservableCollection<TestEntity>();
            target.CollectionChanged += new NotifyCollectionChangedEventHandler(target_CollectionChanged);
            var react = new ReactiveCollection<TestEntity>(this.collection);
            react.ChangeTrackingEnabled = true;

            // update the target projection collection if an item is added
            react.ItemsAdded.Subscribe( v => { if (filter.Invoke(v.Rating)) target.Add(v); } );
            // update the target projection collection if an item is removed (and it was in the target)
            react.ItemsRemoved.Subscribe(v => { if (filter.Invoke(v.Rating) && target.Contains(v)) target.Remove(v); });

            // track items changed in the collection.  Filter only if the property "Rating" changes
            var ratingChangingStream = react.ItemChanging.Where(i => i.PropertyName == "Rating").Select(i => new { Rating = i.Sender.Rating, Entity = i.Sender });
            var ratingChangedStream = react.ItemChanged.Where(i => i.PropertyName == "Rating").Select(i => new { Rating = i.Sender.Rating, Entity = i.Sender });
            // pair the two streams together for before and after the entity has changed.  Make changes to the target
            Observable.Zip(ratingChangingStream,ratingChangedStream, 
                (changingItem, changedItem) => new { ChangingRating=(int)changingItem.Rating, ChangedRating=(int)changedItem.Rating, Entity=changedItem.Entity})
                .Subscribe(v => { 
                    if (filter.Invoke(v.ChangingRating) && (!filter.Invoke(v.ChangedRating))) target.Remove(v.Entity);
                    if ((!filter.Invoke(v.ChangingRating)) && filter.Invoke(v.ChangedRating)) target.Add(v.Entity);
                });

            // should fire CollectionChanged Add in the target view model collection
            this.collection.ElementAt(0).Rating = 0;
            // should fire CollectionChanged Remove in the target view model collection
            this.collection.ElementAt(0).Rating = 5;
        }

        void target_CollectionChanged(object sender, NotifyCollectionChangedEventArgs e)
        {
            System.Diagnostics.Debug.WriteLine(e.Action);
        }
    }
}
like image 196
Joe Wood Avatar answered Nov 04 '22 03:11

Joe Wood


What's wrong with using an ObservableCollection<T>? Rx is a very easy framework to overuse; I find that if you find yourself fighting against the basic premise of an asynchronous stream, you probably shouldn't be using Rx for that particular problem.

like image 34
Richard Szalay Avatar answered Nov 04 '22 05:11

Richard Szalay


All of the INPC implementations that I've ever seen can be best labeled as shortcuts or hacks. However, I can't really fault the developers since the INPC mechanism that the .NET creators choose to support is terrible. With that said, I have recently discovered, in my opinion, the best implementation of INPC, and the best compliment to any MVVM framework around. In addition to providing dozens of extremely helpful functions and extensions, it also sports the most elegant INPC pattern I've seen. It somewhat resembles the ReactiveUI framework, but it wasn't designed to be a comprehensive MVVM platform. To create a ViewModel that supports INPC, it requires no base class, or interfaces, yes is still able to support complete change notification and Two Way binding, and best of all, all of your properties can be automatic!

It does NOT use a utility such as PostSharp or NotifyPropertyWeaver, but is built around the Reactive Extensions framework. The name of this new framework is ReactiveProperty. I suggest visiting the project site (on codeplex), and pulling down the NuGet package. Also, looks through the source code, because it's really a treat.

I'm in no way associated with the developer, and the project is still fairly new. I'm just really enthusiastic about the features it offers.

like image 31
Dylan Vester Avatar answered Nov 04 '22 03:11

Dylan Vester