Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the best way to call async methods using reactiveui + throttle

I'm trying to solve my first task using Rx + ReactiveUI and am looking for best practices to solve a task, showing a input box that will show up suggestions as soon as the user starts typing.

According to the following code sample, what is the best way to load suggestions asynchronosly? Using Subscribe or using Select Many? Or is there a better way to do this beneth these two?

     this.SearchTerms = this.ObservableForProperty(x => x.SearchTerm)
            .Throttle(SuggestionThrottle, RxApp.MainThreadScheduler)
            .Value()
            .SelectMany(async s => await this.LoadSearchSuggestions(s));  // 1st Possibility

        this.SearchTerms.Subscribe(this.LoadSearchSuggestions);           // 2nd Possibility
like image 688
road242 Avatar asked Dec 23 '14 10:12

road242


1 Answers

You must call Subscribe either way.

Queries in Rx use lazy evaluation, which means that merely defining a query does not start it. Lazy evaluation allows you to build a query by applying operators conditionally, to define a query only once and store it in a field for later, or to pass a reference around without causing any side effects until you call Subscribe.

Without calling Subscribe your query will remain inactive.

Subscribe activates the query by passing to the observable your IObserver<T>, or you can use its overloads that allow you to supply OnNext, OnError and/or OnCompleted handlers individually, which Rx converts into an IObserver<T> for you. It is the IObserver<T> that receives the query's notifications.

There is a parameterless overload of Subscribe that internally uses a silent observer, with the intention of starting your query for its side effects only. For example, in your case if you were to use SelectMany to do all of the work of loading the suggestions and you had no need for a separate IObserver<T>, then you would start the query by calling the parameterless overload of Subscribe.

In most cases you shouldn't use the parameterless overload of Subscribe. The point of Subscribe is that the IObserver<T> (or the individual handlers) that you pass to it are intended to cause the side effects of your query. By only causing side effects in either Subscribe or, for instance, the Do operator, a query is much easier to reason about and to maintain.

However, there is one fairly common scenario where using the parameterless overload of Subscribe makes sense: If the side effects of your query are caused by an asynchronous method, then using SelectMany along with the parameterless overload of Subscribe is best.

The reason is simple: SelectMany is the sequential composition operator, which enables you to call an asynchronous method as a sequential step inside your query. Therefore, SelectMany ties the cancellation of the subscription to the cancellation of your asynchronous computation. Disposing of a subscription (represented by the IDisposble that is returned from the call to Subscribe) causes the CancellationToken provided by special asynchronous overloads of the SelectMany operator to be signaled for cancellation. Your asynchronous method can monitor the CancellationToken to exit its computation early.

There aren't any overloads of Subscribe that accept an asynchronous observer, whereby OnNext returns a Task. But even if you were to call a void-returning asynchronous method in your OnNext handler, your asynchronous method wouldn't be signaled when the subscription is disposed.

Note that both of your code samples are slightly wrong anyway. The first example has no need for the async and await keywords. As stated above, there are special overloads of SelectMany that accept a Task<T>-returning selector function, and additional overloads that provide a CancellationToken to that function. You should probably be using the latter.

Your second example shouldn't compile, presuming that LoadSearchSuggestions returns Task. (Unless the ReactiveUI library or some other library that you're referencing is providing an overload of Subscribe that accepts a Task-returning function, in which case you'll have to consult their documentation.)

Barring the latter, and assuming that the rest of your query is correct, here's what you should do:

this.SearchTerms = this.ObservableForProperty(x => x.SearchTerm)
  .Throttle(SuggestionThrottle, RxApp.MainThreadScheduler)
  .Value()
  .SelectMany(LoadSearchSuggestionsAsync)
  .Subscribe();

where LoadSearchSuggestionsAsync is defined like this:

async Task<Unit> LoadSearchSuggestionsAsync(string term, CancellationToken cancel)
{
  ...
  return Unit.Default;
}

Note that Unit represents void in Rx. It's required because an asynchronous method that returns a non-generic Task cannot be used with SelectMany. If you have actual data to return instead, then simply replace Unit with the type of your data. Then you can also pass an OnNext handler to Subscribe and do something with the return value, such as logging.

like image 114
Dave Sexton Avatar answered Oct 20 '22 05:10

Dave Sexton