Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to combine n observables dynamically into a list?

I have a collection of observables that generate state changes for a so-called Channel. And I have a ChannelSet that should monitor those channels.

I would like to write something like this: if one channel is operational, the channel set is up, else, the channel set is down.

IEnumerable<ChannelState> channelStates = ...;
if (channelStates.Any(cs => cs == ChannelState.Operational))
    channelSet.ChannelSetState = ChannelSetState.Up;
else
    channelSet.ChannelSetState = ChannelSetState.Down;

But where do I get my IEnumerable<ChannelState>? If I have 1 channel, I can simply subscribe to its state changes and modify the state of the channel set accordingly. For two channels, I could use CombineLatest:

Observable.CombineLatest(channel0States, channel1States, (cs0, cs1) =>
    {
        if (cs0 == ChannelSetState.Up || cs1 == ChannelSetState.Up)
            return ChannelSetState.Up;
        else
            return ChannelSetState.Down;
    });

But I have an IEnumerable<Channel> and a corresponding IEnumerable<IObservable<ChannelState>>. I'm looking for something like CombineLatest that is not limited to a fixed number of observables.

To complicate matters, the collection of channels can be added to and removed from. So once in a while, a channel will be added for example. The new channel also generates state changes that need to be incorporated.

So what I'm actually looking for is a function:

IEnumerable<IObservable<ChannelState>> --> IObservable<ChannelSetState>

that keeps up-to-date when the input changes. There should be some way to accomplish this using Rx but I can't really figure out how.

like image 870
Ronald Wildenberg Avatar asked Jul 25 '11 05:07

Ronald Wildenberg


2 Answers

There's a fairly straight forward way to do what you want with Rx, but you need to think in terms of observables only and not mix in enumerables.

The function signature that you really need to think in terms of is:

IObservable<IObservable<ChannelState>> --> IObservable<ChannelSetState>

Here's the function:

Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f =
    channelStates =>
        channelStates
            .Merge()
            .Select(cs => cs == ChannelState.Operational ? 1 : -1)
            .Scan(0, (cssn, csn) => cssn + csn)
            .Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down)
            .DistinctUntilChanged();

It is important that each IObservable<ChannelState> in the IObservable<IObservable<ChannelState>> behaves properly for this to work.

I've assumed that the ChannelState enum has an Idle state and that each IObservable<ChannelState> will produce zero or more pairs of Operational/Idle values (Operational followed by Idle) before completing.

Also you said "the collection of channels can be added to and removed from" - thinking in terms of IEnumerable<IObservable<ChannelState>> this sounds reasonable - but in Rx you don't have to worry about removes because each observable can signal its own completion. Once it signals completion then it is as if it has been removed from the collection because it can not produce any further values. So you only need to worry about adding to the collection - this is easy using subjects.

So now the function can be used like so:

var channelStatesSubject = new Subject<IObservable<ChannelState>>();
var channelStates = channelStatesSubject.AsObservable();
var channelSetStates = f(channelStates);

channelSetStates.Subscribe(css => { /* ChannelSetState subscription code */ });

channelStatesSubject.OnNext(/* IObservable<ChannelState> */);
channelStatesSubject.OnNext(/* IObservable<ChannelState> */);
channelStatesSubject.OnNext(/* IObservable<ChannelState> */);
// etc

I ran this using some test code, that used three random ChannelState observables, with a Do call in the f function for debugging, and got the following sequence:

1
Up
2
3
2
1
2
1
0
Down
1
Up
0
Down

I think that's what you're after. Let me know if I've missed anything.


As per the comments below, the ChannelState enum has multiple states, but only Operational means that the connection is up. So it's very easy to add a DistinctUntilChanged operator to hide multiple "down" states. Here's the code now:

Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f =
    channelStates =>
        channelStates
            .Merge()
            .Select(cs => cs == ChannelState.Operational ? 1 : -1)
            .DistinctUntilChanged()
            .Scan(0, (cssn, csn) => cssn + csn)
            .Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down)
            .DistinctUntilChanged();

Added code to ensure that the first select query always starts with a 1. Here's the code now:

Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f =
    channelStates =>
        channelStates
            .Merge()
            .Select(cs => cs == ChannelState.Operational ? 1 : -1)
            .StartWith(1)
            .DistinctUntilChanged()
            .Scan(0, (cssn, csn) => cssn + csn)
            .Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down)
            .DistinctUntilChanged();
like image 73
Enigmativity Avatar answered Dec 18 '22 10:12

Enigmativity


Perhaps start with an IObservable<Channel> rather than starting w/ IEnumerable<Channel>. A way to do this would be to use a Subject<Channel>, and when a new one is created, OnNext() it.

If you need a list,

xsChannels.Subscribe(item => { lock(list) { list.add(item); } });

like image 44
Scott Weinstein Avatar answered Dec 18 '22 11:12

Scott Weinstein