I have two observables, one IObservable<AlertData>
and the other IObservable<SoundRequestData>
. AlertData
contains a property Id
, which knows which SoundRequestData
belongs to it. SoundRequestData
only knows about itself and has an Id
property which can be matched to the one in AlertData
.
I want to combine these two data types into a new type AlertDataViewModel
. However, I cannot be sure that the order of the data that arrives in both observables are the same. I do not care about the order in the output either right now.
What I want is to match AlertData
with SoundRequestData
.
The way I am doing it now, which works but is slow, is to wait until one of the observables is done fetching all the data into a ObservableCollection
. Afterwards I start the other observable and match up the Id's.
Is there a better way of doing this? I guess this could be expressed as the following marble diagram:
So a.id=1
matches up to 3.id=1
, b.id=2
matches up to 4.id=2
and so forth.
First let's introduce a little extension method for IObserver<T>
.
public static IObserver<T> Safe<T>(this IObserver<T> observer)
{
var done = false;
return Observer.Create<TResult>(
value =>
{
if (!done)
{
observer.OnNext(value);
}
},
error =>
{
if (!done)
{
done = true;
observer.OnError(error);
}
},
() =>
{
if (!done)
{
done = true;
observer.OnCompleted();
}
});
}
This just ensures that the observer is called in the pattern OnNext*(OnError|OnCompleted)
and that violations of that are just ignored.
We can now implement the operator you have described by buffering values from both sequences by key and only emitting them when we have a key match between the two sequences.
public static IObservable<TResult> Join<T1, T2, TKey, TResult>(
IObservable<T1> source1,
IObservable<T2> source2,
Func<T1, TKey> key1,
Func<T2, TKey> key2,
Func<T1, T2, TResult> selector)
{
return Observable.Create<TResult>(observer =>
{
var dict1 = new Dictionary<TKey, T1>();
var dict2 = new Dictionary<TKey, T2>();
var gate = new object();
var safeObserver = observer.Safe();
Action<TKey> emit = k =>
{
T1 value1;
T2 value2;
if (dict1.TryGetValue(k, out value1) && dict2.TryGetValue(k, out value2))
{
var result = selector(value1, value2);
safeObserver.OnValue(result);
dict1.Remove(k);
dict2.Remove(k);
}
};
return new CompositeDisposable(
source1.Synchronize(gate).Subscribe(
value1 =>
{
var k = key1(value1);
dict1[k] = value1;
emit(k);
},
safeObserver.OnError,
safeObserver.OnCompleted),
source2.Synchronize(gate).Subscribe(
value2 =>
{
var k = key2(value2);
dict2[k] = value2;
emit(k);
},
safeObserver.OnError,
safeObserver.OnCompleted));
});
}
Example:
IObservable<AlertData> alertDatas = ...;
IObservable<SoundRequestData> = soundRequestDatas = ...;
IObservable<AlertDataViewModel> alertDataViewModels = Join(
alertDatas,
soundRequestDatas,
alertData => alertData.Id,
soundRequestData => soundRequestData.Id,
(alertData, soundRequestData) => new AlertDataViewModel
{
AlertData = alertData,
SoundRequestData = soundRequestData
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With