EDIT: On 09/15/2013 - I am describing my scenario further broken into steps to help everybody understand my situation better. Added the source for whole application for download too. If you want to jump to the original question, scroll down to the last heading. Please let me know the questions. Thanks
Alaska state Capital Juneau has a AST (Alaska State Trooper) headquarters building where they would like to show a large screen with a single number displayed and updated automatically. That number is called (Crime Quotient Index) or CQI
CQI is basically a calculated number to show the current crime situation of the state...
The program that runs the screen is a .NET WPF Application that is constantly receiving CrimeReport objects through a Hot IObservable stream.
CQI is calculated per city and then a Sum() for all cities is taken which is called States CQI Here are the steps of calculating the State CQI
CrimeReport is sent to the .NET Application each time a crime is reported. It has the following components
DateTime of the crime
City - City/County of Jurisdiction
SeverityLevel - Serious/NonSerious
EstimatedSolveTime - Estimated number of days AST determines it would take to solve the crime.
So in this step, we subscribe to the IObservable and create instance of MainViewModel
IObservable<CrimeReport> reportSource = mainSource.Publish();
MainVM = new MainViewModel(reportSource);
reportSource.Connect();
As you receive the Report, group it by city so
var cities = reportSource.GroupBy(k => k.City)
.Select(g => new CityDto(g.Key, g);
CityDto is a DTO class that takes all its report for current city and calculates the City's CQI.
The calculation of City's CQI is done by the following formula
if Ratio of Total number serious crimes to Total number of Non serious crimes is less than 1
then
City's CQI = Ratio x Minimum of Estimated Solve time
else
City's CQI = Ratio x Maximum Estimated Solve time
Here is class definition of CityDto
internal class CityDto
{
public string CityName { get; set; }
public IObservable<decimal> CityCqi {get; set;}
public CityDto(string cityName, IObservable<CrimeReport> cityReports)
{
CityName = cityName;
// Get all serious and non serious crimes
//
var totalSeriousCrimes = cityReports.Where(c => c.Severity == CrimeSeverity.Serious)
.Scan(0, (p, _) => p++);
var totalnonSeriousCrimes = cityReports.Where(c => c.Severity == CrimeSeverity.NonSerious)
.Scan(0, (p, _) => p++);
// Get the ratio
//
var ratio = Observable.CombineLatest(totalSeriousCrimes, totalnonSeriousCrimes,
(s, n) => n == 0? s : s/n); // Avoding DivideByZero here
// Get the minimum and maximum estimated solve time
//
var minEstimatedSolveTime = cityReports.Select(c => c.EstimatedSolveTime)
.Scan(5000, (p, n) => n < p? n : p);
var maxEstimatedSolveTime = cityReports.Select(c=>c.EstimatedSolveTime)
.Scan(0, (p, n) => n > p? n : p);
//Time for the City's CQI
//
CityCqi = Observable.CombineLatest(ratio, minEstimatedSolveTime, maxEstimatedSolveTime, (r, n, x) => r < 1.0? r * n : r * m);
}
}
Now that we have City DTO objects maintaining City's CQI values and exposing that live CQI through an IObservable, Alaska State's Capital would like to Sum() up all the Cities' CQI to show it as Alaska's CQI and show it live on the screen and Each crime reported anywhere in the City/County participating in CQI program should have an immediate effect on the State's CQI
Now we have to calculate the whole state's CQI which is live updating on the big screen, we have State's view model called MainViewModel
internal class MainViewModel
{
public MainViewModel(IObservable<CrimeReport> mainReport)
{
/// Here is the snippet also mentioned in Step 2
//
var cities = mainReport.GroupBy(k => k.City)
.Select(g => new CityDto(g.Key, g));
///// T h i s ///// Is //// Where //// I /// am /// Stuck
//
var allCqis = cities.Select(c => c.CityCqi); // gives you IObservable<IObservable<decimal>> ,
/// Need to use latest of each observable in allCqi and sum them up
//// How do I do it ?
}
}
Source can be downloaded by clicking Here
I have a single hot observable of multiple hot observables...
IObservable<IObservable<decimal>>
I would like an observable that when subscribed will keep its observer informed of the sum of all "Latest" decimal numbers from all observables inside.
How can I achieve that ? I tried CombineLatest(...) but could not get it right.
Thanks
The Rxx library has an overload of CombineLatest()
which takes an IObservable<IObservable<T>>
. If you use this overload, then the solution is easy:
var runningSum = allCqis
.Select(cqi => cqi.StartWith(0)) // start each inner sequence off with 0
.CombineLatest() // produces an IObservable<IList<decimal>>
.Select(cqis => cqis.Sum()); // LINQ operator Sum(IEnumerable<decimal>)
Looking at the source code for Rxx.CombineLatest
might be useful to see how the problem is solved "under the hood"
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With