Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Aggregate function before IObservable sequence is completed

Is there a way to use Aggregate function (Max, Count, ....) with Buffer before a sequence is completed. When Completed this will produce results, but with continues stream it does not give any results?

I was expecting there is some way to make this work with buffer?

IObservable<long> source;
IObservable<IGroupedObservable<long, long>> group  = source
        .Buffer(TimeSpan.FromSeconds(5))
        .GroupBy(i => i % 3);

IObservable<long> sub = group.SelectMany(grp => grp.Max());

sub.Subscribe(l =>
{
    Console.WriteLine("working");
});
like image 557
user007 Avatar asked Dec 15 '25 07:12

user007


2 Answers

Use Scan instead of Aggregate. Scan works just like Aggregate except that it sends out intermediate values as the stream advances. It is good for "running totals", which appears to be what you are asking for.

like image 117
Brandon Avatar answered Dec 16 '25 22:12

Brandon


All the "statistical" operators in Rx (Min/Max/Sum/Count/Average) are using a mechanism that propagate the calculate value just when the subscription is completed, and that is the big difference between Scan and Aggregate, basically if you want to be notified when a new value is pushed in your subscription it is necessary to use Scan.

In your case if you want to keep the same logic, you should combine with GroupByUntil or Window operators, the conditions to use both can create and complete the group subscription regularly, and that will be used to push the next value.

You can get more info here: http://www.introtorx.com/content/v1.0.10621.0/07_Aggregation.html#BuildYourOwn

By the way I wrote a text related to what you want. Check in: http://www.codeproject.com/Tips/853256/Real-time-statistics-with-Rx-Statistical-Demo-App

like image 40
J. Lennon Avatar answered Dec 16 '25 22:12

J. Lennon



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!