Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Combining observables conditionally

I have two observables, one IObservable<AlertData> and the other IObservable<SoundRequestData>. AlertData contains a property Id, which knows which SoundRequestData belongs to it. SoundRequestData only knows about itself and has an Id property which can be matched to the one in AlertData.

I want to combine these two data types into a new type AlertDataViewModel. However, I cannot be sure that the order of the data that arrives in both observables are the same. I do not care about the order in the output either right now.

What I want is to match AlertData with SoundRequestData.

The way I am doing it now, which works but is slow, is to wait until one of the observables is done fetching all the data into a ObservableCollection. Afterwards I start the other observable and match up the Id's.

Is there a better way of doing this? I guess this could be expressed as the following marble diagram:

Imgur

So a.id=1 matches up to 3.id=1, b.id=2 matches up to 4.id=2 and so forth.

like image 438
Cheesebaron Avatar asked Oct 19 '22 05:10

Cheesebaron


1 Answers

First let's introduce a little extension method for IObserver<T>.

public static IObserver<T> Safe<T>(this IObserver<T> observer)
{
    var done = false;
    return Observer.Create<TResult>(
        value =>
        {
            if (!done)
            {
                observer.OnNext(value);
            }
        },
        error =>
        {
            if (!done)
            {
                done = true;
                observer.OnError(error);
            }
        },
        () =>
        {
            if (!done)
            {
                done = true;
                observer.OnCompleted();
            }
        });
}

This just ensures that the observer is called in the pattern OnNext*(OnError|OnCompleted) and that violations of that are just ignored.

We can now implement the operator you have described by buffering values from both sequences by key and only emitting them when we have a key match between the two sequences.

public static IObservable<TResult> Join<T1, T2, TKey, TResult>(
    IObservable<T1> source1,
    IObservable<T2> source2,
    Func<T1, TKey> key1,
    Func<T2, TKey> key2,
    Func<T1, T2, TResult> selector)
{
    return Observable.Create<TResult>(observer =>
    {
        var dict1 = new Dictionary<TKey, T1>();
        var dict2 = new Dictionary<TKey, T2>();
        var gate = new object();
        var safeObserver = observer.Safe();
        Action<TKey> emit = k =>
        {
            T1 value1;
            T2 value2;
            if (dict1.TryGetValue(k, out value1) && dict2.TryGetValue(k, out value2))
            {
                var result = selector(value1, value2);
                safeObserver.OnValue(result);
                dict1.Remove(k);
                dict2.Remove(k);
            }
        };
        return new CompositeDisposable(
            source1.Synchronize(gate).Subscribe(
                value1 =>
                {
                    var k = key1(value1);
                    dict1[k] = value1;
                    emit(k);
                },
                safeObserver.OnError,
                safeObserver.OnCompleted),
            source2.Synchronize(gate).Subscribe(
                value2 =>
                {
                    var k = key2(value2);
                    dict2[k] = value2;
                    emit(k);
                },
                safeObserver.OnError,
                safeObserver.OnCompleted));
    });
}

Example:

IObservable<AlertData> alertDatas = ...;
IObservable<SoundRequestData> = soundRequestDatas = ...;
IObservable<AlertDataViewModel> alertDataViewModels = Join(
    alertDatas,
    soundRequestDatas,
    alertData => alertData.Id,
    soundRequestData => soundRequestData.Id,
    (alertData, soundRequestData) => new AlertDataViewModel
    {
        AlertData = alertData,
        SoundRequestData = soundRequestData
    });
like image 122
Timothy Shields Avatar answered Oct 30 '22 13:10

Timothy Shields