Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Observable.FromAsync vs Task.ToObservable

Does anyone have a steer on when to use one of these methods over the other. They seem to do the same thing in that they convert from TPL Task to an Observable.

Observable.FromAsync appear to support cancellation tokens which might be the subtle difference that allows the method generating the task to participate in cooperative cancellation if the observable is disposed.

Just wondering if I'm missing something obvious as to why you'd use one over the other.

Thanks

like image 665
user630190 Avatar asked Jan 21 '16 18:01

user630190


4 Answers

Observable.FromAsync accepts a TaskFactory in the form of Func<Task> or Func<Task<TResult>>, in this case, the task is only created and executed, when the observable is subscribed to.

Where as .ToObservable() requires an already created (and thus started) Task.

like image 90
Sickboy Avatar answered Oct 31 '22 14:10

Sickboy


@Sickboy answer is correct.

  • Observable.FromAsync() will start the task at the moment of subscription.
  • Task.ToObservable() needs an already running task.

One use for Observable.FromAsync is to control reentrancy for multiple calls to an async method.

This is an example where these two methods are not equivalent:

//ob is some IObservable<T>

//ExecuteQueryAsync is some async method
//Here, ExecuteQueryAsync will run **serially**, the second call will start
//only when the first one is already finished. This is an important property
//if ExecuteQueryAsync doesn't support reentrancy
ob
.Select(x => Observable.FromAsync(() => ExecuteQueryAsync(x))
.Concat()
.ObserveOnDispatcher()
.Subscribe(action)

vs

//ob is some IObservable<T>

//ExecuteQueryAsync is some async method
//Even when the `Subscribe` action order will be the same as the first 
//example because of the `Concat`, ExecuteQueryAsync calls could be     
//parallel, the second call to the method could start before the end of the 
//first call. 
.Select(x => ExecuteQueryAsync(x).ToObservable())
.Concat()
.Subscribe(action)

Note that on the first example one may need the ObserveOn() or ObserveOnDispatcher() method to ensure that the action is executed on the original dispatcher, since the Observable.FromAsync doesn't await the task, thus the continuation is executed on any available dispatcher

like image 33
Rafael Avatar answered Oct 31 '22 13:10

Rafael


Looking at the code, it appears that (at least in some flows) that Observable.FromAsync calls into .ToObservable()*. I am sure the intent that they should be semantically equivalent (assuming you pass the same parameters e.g. Scheduler, CancellationToken etc.).

One is better suited to chaining/fluent syntax, one may read better in isolation. Whichever you coding style favors.

*https://github.com/Reactive-Extensions/Rx.NET/blob/859e6159cb07be67fd36b18c2ae2b9a62979cb6d/Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Async.cs#L727

like image 8
Lee Campbell Avatar answered Oct 31 '22 13:10

Lee Campbell


Aside from being able to use a CancellationToken, FromAsync wraps in a defer so this allows changing the task logic based upon conditions at the time of subscription. Note that the Task will not be started, internally task.ToObservable will be called. The Func does allow you to start the task though when you create it.

like image 1
user487779 Avatar answered Oct 31 '22 14:10

user487779