Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Stream Arithmetic with Reactive Extensions

I'm currently working on an application that combines many streams of data through equations. What I'd like to be able to do is something like:

var result = (stream1 + stream2) / stream3 + stream4 * 2;

Where result updates whenever any of the streams update. At the moment the only way I can express this in Rx is as:

var result = stream1.CombineLatest(stream2, (x,y) => x+y)
  .CombineLatest(stream3, (x,y) => x / y)
  .CombineLatest(stream4, (x,y) => x + y*2);

Which isn't nearly as clear.

My current idea is as follows:

Public class ArithmeticStream : IObservable<double>
{
    public static ArithmeticStream operator +(ArithmeticStream xx, ArithmeticStream yy)
    {
        return Observable.CombineLatest(xx,yy, (x,y) => x + y);
    }
    ...
}

The problem is that CombineLatest returns an IObservable<double> instead of an ArithmeticStream.

Two possible questions:

How can I transparently convert an IObservable<double> into an ArithmeticStream?

Is there an alternative route that will get me the result I want?

like image 419
Matthew Finlay Avatar asked Feb 28 '13 01:02

Matthew Finlay


1 Answers

Hmm...I think you could do it DSL-style relatively easy (no fiddling about with operators):

public static class Ext
{
    public static IObservable<double> Const(this double constant)
    {
        return Observable.Return(constant);
    }

    public static IObservable<double> Plus(this IObservable<double> left, IObservable<double> right)
    {
        return left.CombineLatest(right, (l,r) => l + r);
    }
    public static IObservable<double> Minus(this IObservable<double> left, IObservable<double> right)
    {
        return left.CombineLatest(right, (l,r) => l - r);
    }
    public static IObservable<double> Times(this IObservable<double> left, IObservable<double> right)
    {
        return left.CombineLatest(right, (l,r) => l * r);
    }
    public static IObservable<double> Over(this IObservable<double> left, IObservable<double> right)
    {
        return left.CombineLatest(right,(l,r) => l / r);
    }
}

So your query would be:

var result = (s1.Plus(s2)).Over(s3)
        .Plus(s4)
        .Times(2.0.Const());

Or, for a very chatty variant:

var verboseResult = 
    (s1.Do(Console.WriteLine).Plus(s2.Do(Console.WriteLine)))
    .Over(s3.Do(Console.WriteLine))
    .Plus(s4.Do(Console.WriteLine))
    .Times(2.0.Const())
    .Do(x => Console.WriteLine("(s1 + s2) / s3 + s4 * 2 = " + x));
like image 63
JerKimball Avatar answered Sep 19 '22 21:09

JerKimball