I have an IObservable<IObservable<T>>
where each inner IObservable<T>
is a stream of values followed by an eventual OnCompleted
event.
I would like to transform this into an IObservable<IEnumerable<T>>
, a stream consisting of the latest value from any inner stream that is not completed. It should produce a new IEnumerable<T>
whenever a new value is produced from one of the inner streams (or an inner stream expires)
It is most easily shown with a marble diagram (which I hope is comprehensive enough):
input ---.----.---.----------------
| | '-f-----g-|
| 'd------e---------|
'a--b----c-----|
result ---a--b-b--c-c-c-e-e-e---[]-
d d d e f g
f f
([]
is an empty IEnumerable<T>
and -|
represents the OnCompleted)
You can see that it slightly resembles a CombineLatest
operation.
I have been playing around with Join
and GroupJoin
to no avail but I feel that that is almost certainly the right direction to be heading in.
I would like to use as little state as possible in this operator.
I have updated this question to include not just single-valued sequences - the resultant IObservable<IEnumerable<T>>
should include only the latest value from each sequence - if a sequence has not produced a value, it should not be included.
Here's a version based your solution yesterday, tweaked for the new requirements. The basic idea is to just put a reference into your perishable collection, and then update the value of the reference as the inner sequence produces new values.
I also modified to properly track the inner subscriptions and unsubscribe if the outer observable is unsubscribed.
Also modified to tear it all down if any of the streams produce an error.
Finally, I fixed some race conditions that could violate Rx Guidelines. If your inner observables are firing concurrently from different threads, you could wind up call obs.OnNext
concurrently which is a big no-no. So I've gated each inner observable using the same lock to prevent that (see the Synchronize
call). Note that because of this, you could probably get away with using a regular double linked list instead of the PerishableCollection
because now all of the code using the collection is within a lock so you don't need the threading guarantees of the PerishableCollection
.
// Acts as a reference to the current value stored in the list
private class BoxedValue<T>
{
public T Value;
public BoxedValue(T initialValue) { Value = initialValue; }
}
public static IObservable<IEnumerable<T>> MergeLatest<T>(this IObservable<IObservable<T>> source)
{
return Observable.Create<IEnumerable<T>>(obs =>
{
var collection = new PerishableCollection<BoxedValue<T>>();
var outerSubscription = new SingleAssignmentDisposable();
var subscriptions = new CompositeDisposable(outerSubscription);
var innerLock = new object();
outerSubscription.Disposable = source.Subscribe(duration =>
{
BoxedValue<T> value = null;
var lifetime = new DisposableLifetime(); // essentially a CancellationToken
var subscription = new SingleAssignmentDisposable();
subscriptions.Add(subscription);
subscription.Disposable = duration.Synchronize(innerLock)
.Subscribe(
x =>
{
if (value == null)
{
value = new BoxedValue<T>(x);
collection.Add(value, lifetime.Lifetime);
}
else
{
value.Value = x;
}
obs.OnNext(collection.CurrentItems().Select(p => p.Value.Value));
},
obs.OnError, // handle an error in the stream.
() => // on complete
{
if (value != null)
{
lifetime.Dispose(); // removes the item
obs.OnNext(collection.CurrentItems().Select(p => p.Value.Value));
subscriptions.Remove(subscription); // remove this subscription
}
}
);
});
return subscriptions;
});
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With