Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Transform Observable if other Observables emmited mapping function

I'm creating a game in which there's an observable stream of events X representing products delivered by a manufacture. There are also some kind of external events (let's call them Transformers) that affect the performance of the manufacture in various ways and for various time periods. I want to represent that by other observables that emits a function that transforms X and which should be applied to each X until OnComplete of the Transformer. The numer of Transformers is not known upfront - they are created as a result of user actions (like equipment purchase) or generated randomly (like equipment failures).

I guess I need an IObservable<IObservable<Func<X,X>>> that I have to Join (Zip, something else?) with IObservable<X> to do this. Can you help me with that? Observable.CombineLatest is almost what I need but it takes an IEnumerable<IObservable<T>>.

If my description is unclear, here's a marble diagram: marble diagram

In more abstract terms, what I need is quite analogous to a transposition of a matrix but instead of List<List<T>> I have IObservable<IObservable<T>>.

like image 278
Pein Avatar asked Mar 17 '23 06:03

Pein


1 Answers

Assuming your transformers work on int and your observables are named like this:

IObservable<IObservable<Func<int, int>>> transformerObservables = null;
IObservable<int> values = null;

I would first convert the Observable of Observable of transformers into an Observable of Array of Transformers, that is

IObservable<IObservable<Func<int, int>>> -> IObservable<<Func<int, int>>[]>

First of all, we ultimately will want to add and remove functions into and from lists, and to make sure the right transformer is removed, we have to override the usual comparison mechanism on Func<...>. So we...

var transformerArrayObservable = transformerObservables
    // ...attach each transformer the index of the observable it came from:        
    .Select((transformerObservable, index) => transformerObservable
        .Select(transformer => Tuple.Create(index, transformer))
        // Then, materialize the transformer sequence so we get noticed when the sequence terminates.
        .Materialize()
        // Now the fun part: Make a scan, resulting in an observable of tuples
        // that have the previous and current transformer
        .Scan(new
        {
            Previous = (Tuple<int, Func<int, int>>)null,
            Current = (Tuple<int, Func<int, int>>)null
        },
        (tuple, currentTransformer) => new
        {
            Previous = tuple.Current,
            Current = currentTransformer.HasValue
                ? currentTransformer.Value
                : (Tuple<int, Func<int, int>>)null
        }))
        // Merge these and do another scan, this time adding and removing
        // the transformers from a list.
        .Merge()
        .Scan(
            new Tuple<int, Func<int, int>>[0],
            (array, tuple) =>
            {
                //Expensive! Consider taking a dependency on immutable collections here!
                var list = array.ToList();

                if (tuple.Previous != null)
                    list.Remove(tuple.Previous);

                if (tuple.Current != null)
                    list.Add(tuple.Current);

                return list.ToArray();
            })
            // Extract only the actual functions
        .Select(x => x.Select(y => y.Item2).ToArray())
        // Finally, to make sure that values are passed even when no transformer has been observed
        // start this sequence with the neutral transformation.
        // IMPORTANT: You should test what happens when the first value is oberserved very quickly. There might be timing issues.
        .StartWith(Scheduler.Immediate, new[] { new Func<int, int>[0]});

Now, you will need an operator that is not available in Rx, called CombineVeryLatest. Have a look here.

var transformedValues = values
    .CombineVeryLatest(transformerArrayObservable, (value, transformers) =>
    {
        return transformers
            .Aggregate(value, (current, transformer) => transformer(current));
    });

And you should be done. There is some performance to be gained I'm sure, but you'll get the idea.

like image 122
Daniel C. Weber Avatar answered Apr 30 '23 17:04

Daniel C. Weber