Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to get Sum of "last" items of N hot Observable<decimal> instances?

EDIT: On 09/15/2013 - I am describing my scenario further broken into steps to help everybody understand my situation better. Added the source for whole application for download too. If you want to jump to the original question, scroll down to the last heading. Please let me know the questions. Thanks

Summary

Alaska state Capital Juneau has a AST (Alaska State Trooper) headquarters building where they would like to show a large screen with a single number displayed and updated automatically. That number is called (Crime Quotient Index) or CQI

CQI is basically a calculated number to show the current crime situation of the state...

How it is calculated?

The program that runs the screen is a .NET WPF Application that is constantly receiving CrimeReport objects through a Hot IObservable stream.

CQI is calculated per city and then a Sum() for all cities is taken which is called States CQI Here are the steps of calculating the State CQI

Step 1 - Receive crime data

CrimeReport is sent to the .NET Application each time a crime is reported. It has the following components

DateTime of the crime

City - City/County of Jurisdiction

SeverityLevel - Serious/NonSerious

EstimatedSolveTime - Estimated number of days AST determines it would take to solve the crime.

So in this step, we subscribe to the IObservable and create instance of MainViewModel

IObservable<CrimeReport> reportSource = mainSource.Publish();
  MainVM = new MainViewModel(reportSource);
reportSource.Connect();

Step 2 - Group by city and do maths per city

As you receive the Report, group it by city so

var cities = reportSource.GroupBy(k => k.City)
                  .Select(g => new CityDto(g.Key, g);

CityDto is a DTO class that takes all its report for current city and calculates the City's CQI.

The calculation of City's CQI is done by the following formula

if Ratio of Total number serious crimes to Total number of Non serious crimes is less than 1

then

City's CQI = Ratio x Minimum of Estimated Solve time

else

City's CQI = Ratio x Maximum Estimated Solve time

Here is class definition of CityDto

internal class CityDto 
{
 public string CityName { get; set; }
 public IObservable<decimal> CityCqi {get; set;}

 public CityDto(string cityName, IObservable<CrimeReport> cityReports)
 {
   CityName = cityName;
   // Get all serious and non serious crimes
   //
     var totalSeriousCrimes = cityReports.Where(c => c.Severity == CrimeSeverity.Serious)
     .Scan(0, (p, _) => p++);
     var totalnonSeriousCrimes = cityReports.Where(c => c.Severity == CrimeSeverity.NonSerious)
     .Scan(0, (p, _) => p++);

     // Get the ratio
     //
     var ratio = Observable.CombineLatest(totalSeriousCrimes, totalnonSeriousCrimes, 
                   (s, n) => n ==  0? s : s/n); // Avoding DivideByZero here

     // Get the minimum and maximum estimated solve time
     //
       var minEstimatedSolveTime = cityReports.Select(c => c.EstimatedSolveTime) 
                     .Scan(5000, (p, n) => n < p? n : p);
       var maxEstimatedSolveTime = cityReports.Select(c=>c.EstimatedSolveTime)
                     .Scan(0, (p, n) => n > p? n : p);

    //Time for the City's CQI
    // 
      CityCqi = Observable.CombineLatest(ratio, minEstimatedSolveTime, maxEstimatedSolveTime, (r, n, x) => r < 1.0? r * n : r * m);
 }
}

Now that we have City DTO objects maintaining City's CQI values and exposing that live CQI through an IObservable, Alaska State's Capital would like to Sum() up all the Cities' CQI to show it as Alaska's CQI and show it live on the screen and Each crime reported anywhere in the City/County participating in CQI program should have an immediate effect on the State's CQI

Step 3 - Roll up cities' data for state

Now we have to calculate the whole state's CQI which is live updating on the big screen, we have State's view model called MainViewModel

internal class MainViewModel
{
    public MainViewModel(IObservable<CrimeReport> mainReport)
    {
         /// Here is the snippet also mentioned in Step 2
         //
           var cities = mainReport.GroupBy(k => k.City)
                  .Select(g => new CityDto(g.Key, g));

         ///// T h i s ///// Is //// Where //// I /// am /// Stuck
         //
          var allCqis = cities.Select(c => c.CityCqi); // gives you IObservable<IObservable<decimal>> ,

         /// Need to use latest of each observable in allCqi and sum them up
           //// How do I do it ?
     }
}

Constraints

  • Not all cities in Alaska currently participate in state's CQI program, but cities are enrolling day by day so I cannot have List and adding all the cities regardless of enrollment is not practical either. So it is IObservable which maintains only those cities not only participating but also have sent at least one CrimeReport object.

Full Source code

Source can be downloaded by clicking Here

Originally asked question

I have a single hot observable of multiple hot observables...

IObservable<IObservable<decimal>>

I would like an observable that when subscribed will keep its observer informed of the sum of all "Latest" decimal numbers from all observables inside.

How can I achieve that ? I tried CombineLatest(...) but could not get it right.

Thanks

like image 507
fahadash Avatar asked Sep 14 '13 10:09

fahadash


1 Answers

The Rxx library has an overload of CombineLatest() which takes an IObservable<IObservable<T>>. If you use this overload, then the solution is easy:

var runningSum = allCqis
    .Select(cqi => cqi.StartWith(0)) // start each inner sequence off with 0
    .CombineLatest() // produces an IObservable<IList<decimal>>
    .Select(cqis => cqis.Sum()); // LINQ operator Sum(IEnumerable<decimal>)

Looking at the source code for Rxx.CombineLatest might be useful to see how the problem is solved "under the hood"

like image 64
Brandon Avatar answered Nov 21 '22 03:11

Brandon