Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I combine a dynamic collection of Observables into a new Observable?

I have the concept of a Room in my program. Each room has a number of Users on it, and each user has an Observable that states what level of silence he wants in the room. When the user selects a new value, I just push it to the Observable by calling OnNext on an internal BehaviorSubject instance inside the User class.

The main logic in the program is that one should be able to tell "what level of silence" the room needs. For example, if at least one user in the room needs silence, then the entire room needs silence.

This is a simplification of my classes:

First, I'm using an enum to represent the possible sound levels. Right now there is only two values:

enum SoundLevelRequirement
{
    /// <summary>
    ///     Indicates that the user is ok with any sound level in the room.
    /// </summary>
    None,

    /// <summary>
    ///     Indicates that the user needs absolute silence in the room.
    /// </summary>
    Silence
}

The user just exposes an Observable that ticks whenever his status has changed.

class User
{
    private readonly BehaviorSubject<SoundLevelRequirement> soundLevel;

    public User(string name)
    {
        Name = name;
        soundLevel = new BehaviorSubject<SoundLevelRequirement>(SoundLevelRequirement.None);
    }

    public string Name { get; }

    public IObservable<SoundLevelRequirement> SoundLevelChanged => soundLevel.DistinctUntilChanged();

    public void SetSoundLevel(SoundLevelRequirement level)
    {
        soundLevel.OnNext(level);
    }
}

And the room class is basically a container for users, and should have it's own observable to represent the overall status of the whole room:

class Room
{
    private readonly BehaviorSubject<SoundLevelRequirement> soundLevel;
    private readonly ObservableCollection<User> users;

    public Room(string name)
    {
        Name = name;
        Users = new ObservableCollection<User>();

        // THIS IS WHERE I NEED TO COMBINE ALL CHILD OBSERVABLES INSTEAD 
        // OF USING ANOTHER Subject
        soundLevel = new BehaviorSubject<SoundLevelRequirement>(SoundLevelRequirement.None);
    }

    public ObservableCollection<User> Users { get; set; }

    public string Name { get; }

    public IObservable<SoundLevelRequirement> SoundLevel => soundLevel.DistinctUntilChanged();
}

I'm having trouble in combining the users Observables using Rx directly into another Observable, because of the dynamic nature of the solution. My main problem is that users can leave or join a room at any moment (new User objects can be added to or removed from the room), so I don't have a static list of observables to use. If I had a static list, it would be very easy to achieve by leveraging CombineLatest, and then doing my filtering logic on it, like this:

Users.Select(u => u.SoundLevelChanged).CombineLatest().Select(latest => latest.Max());

This way, whenever any user status changes, I just have to see what the "biggest value" is to determine the room's status. But as soon as I add or remove users I need to keep it in sync with the observable, so this does not work.

I also need to make sure to handle whenever users leave the room appropriately. For example, if a room of 5 people is in the "Silence" state due to one user having selected Silence, I have to reset the room state when that user leaves and set it back to None.

I have thought about using the ObservableCollection there to monitor for additions and removals, but I can't come up with something without having to recreate the observable, which obviously will not work since there are people subscribed to the changes already.

like image 819
julealgon Avatar asked Sep 01 '15 15:09

julealgon


1 Answers

I would try to make this simple if you can. You need a simple change to the class User to do it.

User needs to add this property:

public SoundLevelRequirement SoundLevel => soundLevel.Value;

Now you can implement Room like this:

class Room
{
    private readonly IObservable<SoundLevelRequirement> soundLevel;

    public Room(string name)
    {
        this.Name = name;
        this.Users = new ObservableCollection<User>();

        soundLevel =
            Observable
                .Create<SoundLevelRequirement>(o =>
                    Observable
                        .FromEventPattern<
                            NotifyCollectionChangedEventHandler,
                            NotifyCollectionChangedEventArgs>(
                            h => this.Users.CollectionChanged += h,
                            h => this.Users.CollectionChanged -= h)
                        .SelectMany(x => this.Users
                            .Select(u => u.SoundLevelChanged
                                .StartWith(u.SoundLevel))
                                .Merge())
                        .Select(ep =>
                            this.Users.Any(u =>
                                u.SoundLevel == SoundLevelRequirement.Silence)
                                    ? SoundLevelRequirement.Silence
                                    : SoundLevelRequirement.None)
                        .DistinctUntilChanged()
                        .Subscribe(o));
    }

    public ObservableCollection<User> Users { get; set; }

    public string Name { get; }

    public IObservable<SoundLevelRequirement> SoundLevel => soundLevel.AsObservable();
}

If you need to have the Room.SoundLevel have a replay of 1 then add in .Replay(1) to the soundLevel field, but it'll have to become IConnectableObservable<SoundLevelRequirement> to make that work. You'll need to then implement IDisposable on Room to dispose of the connection. This is the right way to do it - you should avoid subjects where possible. They'll just make your code hard to work with.

like image 150
Enigmativity Avatar answered Oct 18 '22 16:10

Enigmativity