Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Creating a REST client API using Reactive Extensions (Rx)

I'm trying to get my head around the right use cases for Reactive Extensions (Rx). The examples that keeps coming up are UI events (drag and drop, drawing), and suggestions that Rx is suitable for asynchronous applications/operations such as web service calls.

I'm working on an application where I need to write a tiny client API for a REST service. I need to call four REST end-points, three to get some reference data (Airports, Airlines, and Statuses), and the fourth is the main service that will give you flight times for a given airport.

I have created classes exposing the three reference data services, and the methods look something like this:

public Observable<IEnumerable<Airport>> GetAirports()
public Observable<IEnumerable<Airline>> GetAirlines()
public Observable<IEnumerable<Status>> GetStatuses()
public Observable<IEnumerable<Flights>> GetFlights(string airport)

In my GetFlights method I want each Flight to hold a reference the Airport it's departing from, and the Airline operating the flight. To do that I need the data from GetAirports and GetAirlines to be available. Each Airport, Airline and Status will be added to a Dictionar (ie.e Dictionary) so that I can easily set the reference when parsing each flight.

flight.Airport = _airports[flightNode.Attribute("airport").Value]
flight.Airline = _airlines[flightNode.Attribute("airline").Value]
flight.Status = _statuses[flightNode.Attribute("status").Value]

My current implementation now looks like this:

public IObservable<IEnumerable<Flight>> GetFlightsFrom(Airport fromAirport)
{
    var airports = new AirportNamesService().GetAirports();
    var airlines = new AirlineNamesService().GetAirlines();
    var statuses = new StatusService().GetStautses();


    var referenceData = airports
        .ForkJoin(airlines, (allAirports, allAirlines) =>
                            {
                                Airports.AddRange(allAirports);
                                Airlines.AddRange(allAirlines);
                                return new Unit();
                            })
        .ForkJoin(statuses, (nothing, allStatuses) =>
                            {
                                Statuses.AddRange(allStatuses);
                                return new Unit();
                            });

    string url = string.Format(_serviceUrl, 1, 7, fromAirport.Code);

    var flights = from data in referenceData
                    from flight in GetFlightsFrom(url)
                    select flight;

    return flights;
}

private IObservable<IEnumerable<Flight>> GetFlightsFrom(string url)
{
    return WebRequestFactory.GetData(new Uri(url), ParseFlightsXml);
}

The current implementation is based on Sergey's answer, and uses ForkJoin to ensure sequential execution and that I reference data gets loaded before Flights. This implementation is allot more elegant than having to fire a "ReferenceDataLoaded" event like my previous implementation.

like image 232
Jonas Follesø Avatar asked May 14 '10 10:05

Jonas Follesø


1 Answers

I think, if you are receiving a list of entities from each REST call, your call should have a little different signature - you are not observing each value in the return collection, you are observing the event of the call completion. So for airports, it should have the signature:

public IObservable<Aiports> GetAirports()

The next step would be to run first three in parallel and wait on all of them:

var ports_lines_statuses = 
    Observable.ForkJoin(GetAirports(), GetAirlines(), GetStatuses());

The third step woul be to compose the above abservable with the GetFlights():

var decoratedFlights = 
  from pls in ports_lines_statuses
  let airport = MyAirportFunc(pls)
  from flight in GetFlights(airport)
  select flight;

EDIT: I still do not understand why your services return

IObservable<Airport> 

instead of

IObservable<IEnumerable<Airport>>

AFAIK, from the REST call you get all entities at once - but maybe you do paging? Anyway, if you want RX do the buffering you could use .BufferWithCount() :

    var allAirports = new AirportNamesService()
        .GetAirports().BufferWithCount(int.MaxValue); 
...

Then you can apply ForkJoin:

var ports_lines_statuses =  
    allAirports
        .ForkJoin(allAirlines, PortsLinesSelector)
        .ForkJoin(statuses, ...

ports_lines_statuses would contain a single event on the timeline which would contain all the reference data.

EDIT: Here's another one, using the freshly minted ListObservable (latest release only):

allAiports = airports.Start(); 
allAirlines = airlines.Start();
allStatuses = statuses.Start();

...
whenReferenceDataLoaded =
  Observable.Join(airports.WhenCompleted()
                 .And(airlines.WhenCompleted())
                 .And(statuses.WhenCompleted())
                 Then((p, l, s) => new Unit())); 



    public static IObservable<Unit> WhenCompleted<T>(this IObservable<T> source)
    {
        return source
            .Materialize()
            .Where(n => n.Kind == NotificationKind.OnCompleted)
            .Select(_ => new Unit());
    }
like image 180
Sergey Aldoukhov Avatar answered Oct 13 '22 22:10

Sergey Aldoukhov