Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to turn a list of Tasks into an Observable and process elements as they are completed?

Given a collection of Tasks:

var americanAirlines = new FlightPriceChecker("AA");
...
var runningTasks = new List<Task<IList<FlightPrice>>>
{
    americanAirlines.GetPricesAsync(from, to),
    delta.GetPricesAsync(from, to),
    united.GetPricesAsync(from, to)
};

I would like to process the results of GetPricesAsync() in whatever order they arrive. Currently, I'm using a while loop to achieve this:

while (runningTasks.Any())
{
    // Wait for any task to finish
    var completed = await Task.WhenAny(runningTasks);
    // Remove from running list   
    runningTasks.Remove(completed);
    // Process the completed task (updates a property we may be binding to)
    UpdateCheapestFlight(completed.Result);
}

Is this a problem that can be solved more elegantly using Rx? I tried to use something like the code below but got stuck because somewhere I'd have to await each getFlightPriceTask which would block and only then execute the next one instead of taking the first that's done and then wait for the next:

runningTasks
  .ToObservable()
  .Select(getFlightPriceTask => .???.)
like image 370
Krumelur Avatar asked Jul 26 '18 14:07

Krumelur


2 Answers

Try this:

runningTasks
  .Select(getFlightPriceTask => getFlightPriceTask.ToObservable())
  .Merge()
  .Subscribe(flightPrices => UpdateCheapestFlight(flightPrices))
like image 116
Shlomo Avatar answered Nov 04 '22 18:11

Shlomo


@Shlomo's answer helped me a lot (using Merge() was the trick!) and I'd like to comment on it and also present an alternative solution.

Commenting Shlomo's solution

This solution is very simple and expresses the elegance of Rx. Only problem is that it cannot be awaited for completion. This typically not a problem in productive code where we would only care about updating a property which is then bound to the UI. The other comment I have is that the calculation is done in Subscribe() - some like to keep the subscriptions super lightweight but I think that's mostly personal preference.

runningTasks
  // Get all tasks and turn them into Observables.
  .Select(getFlightPriceTask => getFlightPriceTask.ToObservable())
  // Merge all tasks (in my case 3) into one "lane". Think of cars trying
  // to leave a three lane highway and going for a one lane exit.
  .Merge()
  // For every task "leaving the highway" calculate the minimum price.
  .Subscribe(flightPrices => UpdateCheapestFlight(flightPrices))

Alternative 1: using Do()

This is not using Subscribe() at all which is kind of against the idea of Rx but it can be awaited and therefore behaves like the original version.

await runningTasks
    .Select(getFlightPriceTask => getFlightPriceTask.ToObservable())
    .Merge()
    // Process result of each task.
    .Do(flightPrices => UpdateCheapestFlight(flightPrices))
    // Taking all elements will only complete if all three tasks have completed.
    .Take(runningTasks.Count);

Alternative 2: eliminating UpdateCheapestFlight()

Finally, I think a way to do this more Rx style is to not use the original helper method at all and tell an "Rx-story" that is easy to read.

var minFlightPrice = await runningTasks
    // Get all the tasks and turn them into Observables 
    .Select(getFlightPriceTask => getFlightPriceTask.ToObservable())
    // Merge all three into one "lane".
    .Merge()
    // Get local minimum value of each airline
    .Select(x => x.Min())
    // Take all the local minimums...
    .Take(runningTasks.Count)
    // ...and find the minimum of them.
    .Min();
like image 42
Krumelur Avatar answered Nov 04 '22 17:11

Krumelur