Given a collection of Tasks
:
var americanAirlines = new FlightPriceChecker("AA");
...
var runningTasks = new List<Task<IList<FlightPrice>>>
{
americanAirlines.GetPricesAsync(from, to),
delta.GetPricesAsync(from, to),
united.GetPricesAsync(from, to)
};
I would like to process the results of GetPricesAsync()
in whatever order they arrive. Currently, I'm using a while loop to achieve this:
while (runningTasks.Any())
{
// Wait for any task to finish
var completed = await Task.WhenAny(runningTasks);
// Remove from running list
runningTasks.Remove(completed);
// Process the completed task (updates a property we may be binding to)
UpdateCheapestFlight(completed.Result);
}
Is this a problem that can be solved more elegantly using Rx?
I tried to use something like the code below but got stuck because somewhere I'd have to await
each getFlightPriceTask
which would block and only then execute the next one instead of taking the first that's done and then wait for the next:
runningTasks
.ToObservable()
.Select(getFlightPriceTask => .???.)
Try this:
runningTasks
.Select(getFlightPriceTask => getFlightPriceTask.ToObservable())
.Merge()
.Subscribe(flightPrices => UpdateCheapestFlight(flightPrices))
@Shlomo's answer helped me a lot (using Merge()
was the trick!) and I'd like to comment on it and also present an alternative solution.
Commenting Shlomo's solution
This solution is very simple and expresses the elegance of Rx. Only problem is that it cannot be awaited for completion. This typically not a problem in productive code where we would only care about updating a property which is then bound to the UI.
The other comment I have is that the calculation is done in Subscribe()
- some like to keep the subscriptions super lightweight but I think that's mostly personal preference.
runningTasks
// Get all tasks and turn them into Observables.
.Select(getFlightPriceTask => getFlightPriceTask.ToObservable())
// Merge all tasks (in my case 3) into one "lane". Think of cars trying
// to leave a three lane highway and going for a one lane exit.
.Merge()
// For every task "leaving the highway" calculate the minimum price.
.Subscribe(flightPrices => UpdateCheapestFlight(flightPrices))
Alternative 1: using Do()
This is not using Subscribe()
at all which is kind of against the idea of Rx but it can be awaited and therefore behaves like the original version.
await runningTasks
.Select(getFlightPriceTask => getFlightPriceTask.ToObservable())
.Merge()
// Process result of each task.
.Do(flightPrices => UpdateCheapestFlight(flightPrices))
// Taking all elements will only complete if all three tasks have completed.
.Take(runningTasks.Count);
Alternative 2: eliminating UpdateCheapestFlight()
Finally, I think a way to do this more Rx style is to not use the original helper method at all and tell an "Rx-story" that is easy to read.
var minFlightPrice = await runningTasks
// Get all the tasks and turn them into Observables
.Select(getFlightPriceTask => getFlightPriceTask.ToObservable())
// Merge all three into one "lane".
.Merge()
// Get local minimum value of each airline
.Select(x => x.Min())
// Take all the local minimums...
.Take(runningTasks.Count)
// ...and find the minimum of them.
.Min();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With