Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How would I organize these calls using Reactive Extensions (Rx) in Silverlight?

I have some calls that must execute sequentially. Consider an IService that has a Query and a Load method. The Query gives a list of widgets, and the load provides a "default" widget. Hence, my service looks like this.

void IService.Query(Action<IEnumerable<Widget>,Exception> callback);
void IService.Load(Action<Widget,Exception> callback); 

With that in mind, here is a rough sketch of the view model:

public class ViewModel : BaseViewModel
{
   public ViewModel()
   {
      Widgets = new ObservableCollection<Widget>();

      WidgetService.Query((widgets,exception) =>
      {
          if (exception != null) 
          {
              throw exception;
          }

          Widgets.Clear();

          foreach(var widget in widgets)
          {
             Widgets.Add(widget);
          }

          WidgetService.Load((defaultWidget,ex) =>
          {
             if (ex != null)
             {
                 throw ex;
             }
             if (defaultWidget != null)
             {
                CurrentWidget = defaultWidget;
             }
          }
      });
   }

   public IService WidgetService { get; set; } // assume this is wired up

   public ObservableCollection<Widget> Widgets { get; private set; }

   private Widget _currentWidget; 

   public Widget CurrentWidget 
   {
      get { return _currentWidget; }
      set 
      {
         _currentWidget = value; 
         RaisePropertyChanged(()=>CurrentWidget);
      }
   }
}

What I'd like to do is simplify the sequential workflow of calling query and then the default. Perhaps the best way to do this is nested with lambda expressions as I've shown, but I figured there may be a more elegant way with Rx. I don't want to use Rx for the sake of Rx, but if it can allow me to organize the logic above so it is easier to read/maintain in the method, I'll take advantage of it. Ideally, something like:

Observable.Create(
   ()=>firstAction(), 
   ()=>secondAction())
.Subscribe(action=>action(),error=>{ throw error; }); 

With the power threading library, I'd do something like:

Service.Query(list=>{result=list};
yield return 1;
ProcessList(result);
Service.Query(widget=>{defaultWidget=widget};
yield return 1;
CurrentWidget = defaultWidget;

That makes it far more evident that the workflow is sequential and eliminates nesting (the yields are part of the async enumerator and are boundaries that block until the results come back).

Anything similar would make sense to me.

So the essence of the question: am I trying to fit a square peg into a round hole, or is there a way to redefine the nested asynchronous calls using Rx?

like image 714
Jeremy Likness Avatar asked Aug 16 '10 22:08

Jeremy Likness


2 Answers

You can convert methods of service so they return IObservable instead of taking callback as a parameter. In this case sequential workflow can be implemented using SelectMany, something like this...

        WidgetService.Query()
            .SelectMany(
                widgets =>
                {
                    Widgets.Clear();
                    foreach (var w in widgets)
                    {
                        Widgets.Add(w);
                    }

                    return WidgetService.Load();
                }
            )
            .Do(
                defaultWidget =>
                {
                    if (defaultWidget != null)
                        Default = defaultWidget;
                }
            )
            .Subscribe(
                _ => { },
                e => { throw e; }
            );

However IMO F# asyncs will look much more clear (in sample I assume that methods of service returns Async> and Async respectively). Note that sample doesn't take in account what thread is modifying data fields, in real-world code you should pay attention to this:

    let load = async {
            let! widgets = WidgetService.Query()

            Widgets.Clear()
            for w in widgets do
                Widgets.Add(w)

            let! defaultWidget = WidgetService.Load()
            if defaultWidget <> null then
                Default <- defaultWidget

            return ()
        }

    Async.StartWithContinuations(
        load, 
        ignore, // success continuation - ignore result
        raise,  // error continuation - reraise exception
        ignore  // cancellation continuation - ignore
        )

EDITED

In fact it is possible to use technique with iterators you mentioned in your question:

    private IEnumerable<IObservable<object>> Intialize()
    {
        var widgetsList = WidgetService.Query().Start();
        yield return widgetsList;

        Widgets.Clear();
        foreach (var w in widgetsList[0])
        {
            Widgets.Add(w);
        }

        var defaultWidgetList = WidgetService.Load().Start();
        yield return defaultWidgetList;

        if (defaultWidgetList[0] != null)
            Default = defaultWidgetList[0];
    }

    Observable
        .Iterate(Intialize)
        .Subscribe(
        _ => { },
        ex => { throw ex; }
        );
like image 56
desco Avatar answered Jan 03 '23 12:01

desco


You could also do this using ReactiveXaml, though since your CurrentWidget and Widgets are both mutable, you can't make it as clean (there's a class called ObservableAsPropertyHelper which will update a property based on an IObservable and fire the RaisePropertyChanged):

public class ViewModel
{
    public ViewModel()
    {
        // These return a Func that wraps an async call in an IObservable<T>
        // that always yields only one item (the result of the call)
        var QueryAsObservable = Observable.FromAsyncCommand<IEnumerable<Widget>>(WebService.BeginQuery, WebService.EndQuery);
        var LoadAsObservable = Observable.FromAsyncCommand<Widget>(WebService.BeginLoad, WebService.EndLoad);

        // Create a new command 
        QueryAndLoad = new ReactiveAsyncCommand();

        // QueryAndLoad fires every time someone calls ICommand.Execute
        // The .Do is the hacky part, for sync calls it's hidden by RegisterAsyncFunction
        var async_results = QueryAndLoad.SelectMany(_ => QueryAsObservable())
                                        .Do(_ => DoTranslate.AsyncCompletedNotification.OnNext(new Unit()));

        // Query up the Widgets 
        async_results.Subscribe(x => x.Run(Widgets.Add));

        // Now execute the Load
        async_results.SelectMany(_ => LoadAsObservable())
                     .Subscribe(x => CurrentWidget = x);

        QueryAndLoad.Execute();
    }

    public ReactiveAsyncCommand QueryAndLoad {get; private set; }

    public ObservableCollection<Widget> Widgets {get; private set; }

    public Widget CurrentWidget {get; set; }
}
like image 25
Ana Betts Avatar answered Jan 03 '23 12:01

Ana Betts