I need to implement a version of CombineLatest
(I'll call it WithLatest
here) that calls the selector for every item on the left and the latest item on the right. It shouldn't push for items on the right changing only.
I think whether this is built Observable.Create
or a combination of existing extensions is not particularly important; I'll be making this a "boxed" extension method either way.
Example
var left = new Subject<int>();
var right = new Subject<int>();
left.WithLatest(right, (l,r) => l + " " + r).Dump();
left.OnNext(1); // <1>
left.OnNext(2); // <2>
right.OnNext(1); // <3>
right.OnNext(2); // <4>
left.OnNext(3); // <5>
should yield
2 1
3 2
Edit: The logic of my example goes:
WithLatest
and CombineLatest
It's been suggested that I try:
var lr = right.ObserveOn(Scheduler.TaskPool).Latest();
left.Select(l => l + " " + lr.First()).Dump();
but this blocks on the current thread for my test.
You can do this using existing operators.
Func<int, int, string> selector = (l, r) => l + " " + r;
var query = right.Publish(rs => left.Zip(rs.MostRecent(0), selector).SkipUntil(rs));
Publish
ensures we only ever subscribe to right
once and share the subscription among all subscribers to rs
.
MostRecent
turns an IObservable<T>
into an IEnumerable<T>
that always yields the most recently emitted value from the source observable.
Zip
between IObservable<T>
and IEnumerable<U>
emits a value each time the observable emits a value.
SkipUntil
skips the pairs (l, r)
which occur before right
ever emits a value.
I also had the same need for a CombineLatest
which "pushes only for the left".
I made the solution an "overload" of Observable.Sample
, because that's what the method does:
It samples a source
(right) with a sampler
(left), with the additional capability of providing a resultSelector
(like in CombineLatest).
public static IObservable<TResult> Sample<TSource, TSample, TResult>(
this IObservable<TSource> source,
IObservable<TSample> sampler,
Func<TSource, TSample, TResult> resultSelector)
{
var multiSampler = sampler.Publish().RefCount();
return source.CombineLatest(multiSampler, resultSelector).Sample(multiSampler);
}
Based on the solution picked by the post author I think there's an even simpler solution utilizing DistinctUntilChanged:
public static IObservable<TResult> CombineLatestOnLeft<TLeft, TRight, TResult>(this IObservable<TLeft> leftSource, IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector) {
return leftSource
.Select<TLeft, Tuple<TLeft, int>>(Tuple.Create<TLeft, int>)
.CombineLatest(rightSource,
(l, r) => new { Index = l.Item2, Left = l.Item1, Right = r })
.DistinctUntilChanged(x => x.Index)
.Select(x => selector(x.Left, x.Right));
}
or even
public static IObservable<TResult> CombineLatestOnLeft<TLeft, TRight, TResult>(this IObservable<TLeft> leftSource, IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector) {
return leftSource
.CombineLatest(rightSource,
(l, r) => new { Left = l, Right = r })
.DistinctUntilChanged(x => x.Left)
.Select(x => selector(x.Left, x.Right));
}
if you only care about distinct values of leftSource
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With