I'm trying to get my head around the right use cases for Reactive Extensions (Rx). The examples that keeps coming up are UI events (drag and drop, drawing), and suggestions that Rx is suitable for asynchronous applications/operations such as web service calls.
I'm working on an application where I need to write a tiny client API for a REST service. I need to call four REST end-points, three to get some reference data (Airports, Airlines, and Statuses), and the fourth is the main service that will give you flight times for a given airport.
I have created classes exposing the three reference data services, and the methods look something like this:
public Observable<IEnumerable<Airport>> GetAirports()
public Observable<IEnumerable<Airline>> GetAirlines()
public Observable<IEnumerable<Status>> GetStatuses()
public Observable<IEnumerable<Flights>> GetFlights(string airport)
In my GetFlights method I want each Flight to hold a reference the Airport it's departing from, and the Airline operating the flight. To do that I need the data from GetAirports and GetAirlines to be available. Each Airport, Airline and Status will be added to a Dictionar (ie.e Dictionary) so that I can easily set the reference when parsing each flight.
flight.Airport = _airports[flightNode.Attribute("airport").Value]
flight.Airline = _airlines[flightNode.Attribute("airline").Value]
flight.Status = _statuses[flightNode.Attribute("status").Value]
My current implementation now looks like this:
public IObservable<IEnumerable<Flight>> GetFlightsFrom(Airport fromAirport)
{
var airports = new AirportNamesService().GetAirports();
var airlines = new AirlineNamesService().GetAirlines();
var statuses = new StatusService().GetStautses();
var referenceData = airports
.ForkJoin(airlines, (allAirports, allAirlines) =>
{
Airports.AddRange(allAirports);
Airlines.AddRange(allAirlines);
return new Unit();
})
.ForkJoin(statuses, (nothing, allStatuses) =>
{
Statuses.AddRange(allStatuses);
return new Unit();
});
string url = string.Format(_serviceUrl, 1, 7, fromAirport.Code);
var flights = from data in referenceData
from flight in GetFlightsFrom(url)
select flight;
return flights;
}
private IObservable<IEnumerable<Flight>> GetFlightsFrom(string url)
{
return WebRequestFactory.GetData(new Uri(url), ParseFlightsXml);
}
The current implementation is based on Sergey's answer, and uses ForkJoin to ensure sequential execution and that I reference data gets loaded before Flights. This implementation is allot more elegant than having to fire a "ReferenceDataLoaded" event like my previous implementation.
I think, if you are receiving a list of entities from each REST call, your call should have a little different signature - you are not observing each value in the return collection, you are observing the event of the call completion. So for airports, it should have the signature:
public IObservable<Aiports> GetAirports()
The next step would be to run first three in parallel and wait on all of them:
var ports_lines_statuses =
Observable.ForkJoin(GetAirports(), GetAirlines(), GetStatuses());
The third step woul be to compose the above abservable with the GetFlights():
var decoratedFlights =
from pls in ports_lines_statuses
let airport = MyAirportFunc(pls)
from flight in GetFlights(airport)
select flight;
EDIT: I still do not understand why your services return
IObservable<Airport>
instead of
IObservable<IEnumerable<Airport>>
AFAIK, from the REST call you get all entities at once - but maybe you do paging? Anyway, if you want RX do the buffering you could use .BufferWithCount() :
var allAirports = new AirportNamesService()
.GetAirports().BufferWithCount(int.MaxValue);
...
Then you can apply ForkJoin:
var ports_lines_statuses =
allAirports
.ForkJoin(allAirlines, PortsLinesSelector)
.ForkJoin(statuses, ...
ports_lines_statuses would contain a single event on the timeline which would contain all the reference data.
EDIT: Here's another one, using the freshly minted ListObservable (latest release only):
allAiports = airports.Start();
allAirlines = airlines.Start();
allStatuses = statuses.Start();
...
whenReferenceDataLoaded =
Observable.Join(airports.WhenCompleted()
.And(airlines.WhenCompleted())
.And(statuses.WhenCompleted())
Then((p, l, s) => new Unit()));
public static IObservable<Unit> WhenCompleted<T>(this IObservable<T> source)
{
return source
.Materialize()
.Where(n => n.Kind == NotificationKind.OnCompleted)
.Select(_ => new Unit());
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With