Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

"Merging" a stream of streams to produce a stream of the latest values of each

I have an IObservable<IObservable<T>> where each inner IObservable<T> is a stream of values followed by an eventual OnCompleted event.

I would like to transform this into an IObservable<IEnumerable<T>>, a stream consisting of the latest value from any inner stream that is not completed. It should produce a new IEnumerable<T> whenever a new value is produced from one of the inner streams (or an inner stream expires)

It is most easily shown with a marble diagram (which I hope is comprehensive enough):

input ---.----.---.----------------
         |    |   '-f-----g-|      
         |    'd------e---------|
         'a--b----c-----|          

result ---a--b-b--c-c-c-e-e-e---[]-
               d  d d e f g        
                    f f            

([] is an empty IEnumerable<T> and -| represents the OnCompleted)

You can see that it slightly resembles a CombineLatest operation. I have been playing around with Join and GroupJoin to no avail but I feel that that is almost certainly the right direction to be heading in.

I would like to use as little state as possible in this operator.

Update

I have updated this question to include not just single-valued sequences - the resultant IObservable<IEnumerable<T>> should include only the latest value from each sequence - if a sequence has not produced a value, it should not be included.

like image 651
Alex Avatar asked Jul 28 '13 17:07

Alex


1 Answers

Here's a version based your solution yesterday, tweaked for the new requirements. The basic idea is to just put a reference into your perishable collection, and then update the value of the reference as the inner sequence produces new values.

I also modified to properly track the inner subscriptions and unsubscribe if the outer observable is unsubscribed.

Also modified to tear it all down if any of the streams produce an error.

Finally, I fixed some race conditions that could violate Rx Guidelines. If your inner observables are firing concurrently from different threads, you could wind up call obs.OnNext concurrently which is a big no-no. So I've gated each inner observable using the same lock to prevent that (see the Synchronize call). Note that because of this, you could probably get away with using a regular double linked list instead of the PerishableCollection because now all of the code using the collection is within a lock so you don't need the threading guarantees of the PerishableCollection.

// Acts as a reference to the current value stored in the list
private class BoxedValue<T>
{
    public T Value;
    public BoxedValue(T initialValue) { Value = initialValue; }
}

public static IObservable<IEnumerable<T>> MergeLatest<T>(this IObservable<IObservable<T>> source)
{
    return Observable.Create<IEnumerable<T>>(obs =>
    {
        var collection = new PerishableCollection<BoxedValue<T>>();
        var outerSubscription = new SingleAssignmentDisposable();
        var subscriptions = new CompositeDisposable(outerSubscription);
        var innerLock = new object();

        outerSubscription.Disposable = source.Subscribe(duration =>
        {
            BoxedValue<T> value = null;
            var lifetime = new DisposableLifetime(); // essentially a CancellationToken
            var subscription = new SingleAssignmentDisposable();

            subscriptions.Add(subscription);
            subscription.Disposable = duration.Synchronize(innerLock)
                .Subscribe(
                    x =>
                    {
                        if (value == null)
                        {
                            value = new BoxedValue<T>(x);
                            collection.Add(value, lifetime.Lifetime);
                        }
                        else
                        {
                            value.Value = x;
                        }
                        obs.OnNext(collection.CurrentItems().Select(p => p.Value.Value));
                    },
                    obs.OnError, // handle an error in the stream.
                    () => // on complete
                    {
                        if (value != null)
                        {
                            lifetime.Dispose(); // removes the item
                            obs.OnNext(collection.CurrentItems().Select(p => p.Value.Value));
                            subscriptions.Remove(subscription); // remove this subscription
                        }
                    }
            );
        });

        return subscriptions;
    });
}
like image 73
Brandon Avatar answered Sep 29 '22 20:09

Brandon