Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to yield return item when doing Task.WhenAny

I have two projects in my solution: WPF project and class library.

In my class library:

I have a List of Symbol:

class Symbol {      Identifier Identifier {get;set;}      List<Quote> HistoricalQuotes {get;set;}      List<Financial> HistoricalFinancials {get;set;} } 

For each symbol, I query a financial service to retrieve historical financial data for each one of my symbols using a webrequest. (webClient.DownloadStringTaskAsync(uri);)

So here's my method which do that:

    public async Task<IEnumerable<Symbol>> GetSymbolsAsync()     {         var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>();          foreach (var symbol in await _listSymbols)         {             historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol));         }          while (historicalFinancialTask.Count > 0)         {             var historicalFinancial = await Task.WhenAny(historicalFinancialTask);             historicalFinancialTask.Remove(historicalFinancial);              // the line below doesn't compile, which is understandable because method's return type is a Task of something             yield return new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data);          }     }      private async Task<HistoricalFinancialResult> GetFinancialsQueryAsync(Symbol symbol)     {         var result = new HistoricalFinancialResult();         result.Symbol = symbol;         result.Data = await _financialsQuery.GetFinancialsQuery(symbol.Identifier); // contains some logic like parsing and use WebClient to query asynchronously         return result;     }      private class HistoricalFinancialResult     {         public Symbol Symbol { get; set; }         public IEnumerable<Financial> Data { get; set; }          // equality members     } 

As you can see, I want that each time I download a Financial historical data per symbol, to yield the result instead of waiting for all my calls to financial service to complete.

And in my WPF, here's what I would like to do:

foreach(var symbol in await _service.GetSymbolsAsync()) {       SymbolsObservableCollection.Add(symbol); } 

It seems we can't yield return in an async method, then what solution can I use? Except moving my GetSymbols method into my WPF project.

like image 285
John Avatar asked Aug 17 '13 01:08

John


People also ask

How does task WhenAny work?

By using Task. WhenAny, you can start multiple tasks at the same time and process them one by one as they're completed rather than process them in the order in which they're started. The following example uses a query to create a collection of tasks. Each task downloads the contents of a specified website.

What is the return type of task?

The Task<TResult> return type is used for an async method that contains a return statement in which the operand is TResult . In the following example, the GetLeisureHoursAsync method contains a return statement that returns an integer. The method declaration must specify a return type of Task<int> .

What is Task yield?

With await Task. Yield() , you force it to be asynchronous in a way that the subsequent code is still run on the current context (just at a later point in time).

Is yield return async?

Using an async yield return statement requires that the method be asynchronous, making use of async/await. Usually an async method will return a task. Your first thought when using yield return in your async method may be to have the method return Task of IEnumerable.


2 Answers

While I like the TPL Dataflow components (which svick suggests you use), moving over to that system does require a substantial commitment - it's not something you can just add to an existing design. It offers considerable benefits if you're performing high volumes of CPU-intensive data processing and want to exploit many CPU cores. But getting the best out of it is non-trivial.

His other suggestion, using Rx, might be easier to integrate with an existing solution. (See the original documentation, but for the latest code, use the Rx-Main nuget package. Or if you'd like to look at the source, see the Rx CodePlex site) It would even be possible for the calling code to carry on using an IEnumerable<Symbol> if you want - you can use Rx purely as an implementation detail, [edit 2013/11/09 to add:] although as svick has pointed out, that's probably not a good idea, given your end goal.

Before I show you an example, I want to be clear about what exactly we're doing. Your example had a method with this signature:

public async Task<IEnumerable<Symbol>> GetSymbolsAsync() 

That return type, Task<IEnumerable<Symbol>>, essentially says "This is a method that produces a single result of type IEnumerable<Symbol>, and it may not produce that result immediately."

It's that single result bit that I think is causing you grief, because that's not really what you want. A Task<T> (no matter what T may be) represents a single asynchronous operation. It may have many steps (many uses of await if you implement it as a C# async method) but ultimately it produces one thing. You want to produce multiple things, at different, times, so Task<T> is not a good fit.

If you were really going to do what your method signature promises - producing one result eventually - one way you could do this is to have your async method build a list and then produce that as the result when it's good and ready:

// Note: this first example is *not* what you want. // However, it is what your method's signature promises to do. public async Task<IEnumerable<Symbol>> GetSymbolsAsync() {     var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>();      foreach (var symbol in await _listSymbols)     {         historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol));     }      var results = new List<Symbol>();     while (historicalFinancialTask.Count > 0)     {         var historicalFinancial = await Task.WhenAny(historicalFinancialTask);         historicalFinancialTask.Remove(historicalFinancial);          results.Add(new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data));      }      return results; } 

This method does what its signature says: it asynchronously produces a sequence of symbols.

But presumably you'd like to create an IEnumerable<Symbol> that produces the items as they become available, rather than waiting until they're all available. (Otherwise, you might as well just use WhenAll.) You can do that, but yield return is not the way.

In short, what I think you want to do is produce an asynchronous list. There's a type for that: IObservable<T> expresses exactly what I believe you were hoping to express with your Task<IEnumerable<Symbol>>: it's a sequence of items (just like IEnumerable<T>) but asynchronous.

It may help to understand it by analogy:

public Symbol GetSymbol() ... 

is to

public Task<Symbol> GetSymbolAsync() ... 

as

public IEnumerable<Symbol> GetSymbols() ... 

is to:

public IObservable<Symbol> GetSymbolsObservable() ... 

(Unfortunately, unlike with Task<T> there isn't a common naming convention for what to call an asynchronous sequence-oriented method. I've added 'Observable' on the end here, but that's not universal practice. I certainly wouldn't call it GetSymbolsAsync because people will expect that to return a Task.)

To put it another way, Task<IEnumerable<T>> says "I'll produce this collection when I'm good and ready" whereas IObservable<T> says: "Here's a collection. I'll produce each item when I'm good and ready."

So, you want a method that returns a sequence of Symbol objects, where those objects are produced asynchronously. That tells us that you should really be returning an IObservable<Symbol>. Here's an implementation:

// Unlike this first example, this *is* what you want. public IObservable<Symbol> GetSymbolsRx() {     return Observable.Create<Symbol>(async obs =>     {         var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>();          foreach (var symbol in await _listSymbols)         {             historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol));         }          while (historicalFinancialTask.Count > 0)         {             var historicalFinancial = await Task.WhenAny(historicalFinancialTask);             historicalFinancialTask.Remove(historicalFinancial);              obs.OnNext(new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data));         }     }); } 

As you can see, this lets you write pretty much what you were hoping to write - the body of this code is almost identical to yours. The only difference is that where you were using yield return (which didn't compile), this calls the OnNext method on an object supplied by Rx.

Having written that, you can easily wrap this in an IEnumerable<Symbol> ([Edited 2013/11/29 to add:] although you probably don't actually want to do this - see addition at end of answer):

public IEnumerable<Symbol> GetSymbols() {     return GetSymbolsRx().ToEnumerable(); } 

This may not look asynchronous, but it does in fact allow the underlying code to operate asynchronously. When you call this method, it will not block - even if the underlying code that does the work of fetching the financial information cannot produce a result immediately, this method will nonetheless immediately return an IEnumerable<Symbol>. Now of course, any code that attempts to iterate through that collection will end up blocking if data is not yet available. But the critical thing is that does what I think you were originally trying to achieve:

  • You get to write an async method that does the work (a delegate in my example, passed as an argument to Observable.Create<T> but you could write a standalone async method if you prefer)
  • The calling code will not be blocked merely as result of asking you to start fetching the symbols
  • The resulting IEnumerable<Symbol> will produce each individual item as soon as it becomes available

This works because Rx's ToEnumerable method has some clever code in it that bridges the gap between the synchronous world view of IEnumerable<T> and asynchronous production of results. (In other words, this does exactly what you were disappointed to discover C# wasn't able to do for you.)

If you're curious, you can look at the source. The code that underlies what ToEnumerable does can be found at https://rx.codeplex.com/SourceControl/latest#Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GetEnumerator.cs

[Edited 2013/11/29 to add:]

svick has pointed out in the comments something I missed: your final goal is to put the contents into an ObservableCollection<Symbol>. Somehow I didn't see that bit. That means IEnumerable<T> is the wrong way to go - you want to populate the collection as items become available, rather than doing through with a foreach loop. So you'd just do this:

GetSymbolsRx().Subscribe(symbol => SymbolsObservableCollection.Add(symbol)); 

or something along those lines. That will add items to the collection as and when they become available.

This depends on the whole thing being kicked off on the UI thread by the way. As long as it is, your async code should end up running on the UI thread, meaning that when items are added to the collection, that also happens on the UI thread. But if for some reason you end up launching things from a worker thread (or if you were to use ConfigureAwait on any of the awaits, thus breaking the connection with the UI thread) you'd need to arrange to handle the items from the Rx stream on the right thread:

GetSymbolsRx()     .ObserveOnDispatcher()     .Subscribe(symbol => SymbolsObservableCollection.Add(symbol)); 

If you're on the UI thread when you do that, it'll pick up the current dispatcher, and ensure all notifications arrive through it. If you're already on the wrong thread when you come to subscribe, you can use the ObserveOn overload that takes a dispatcher. (These require you to have a reference to System.Reactive.Windows.Threading. And these are extension methods, so you'll need a using for their containing namespace, which is also called System.Reactive.Windows.Threading)

like image 129
Ian Griffiths Avatar answered Sep 21 '22 07:09

Ian Griffiths


What you're asking for doesn't make much sense, because IEnumerable<T> is a synchronous interface. In other words, if an item is not available yet, the MoveNext() method has to block, it has no other choice.

What you need is some sort of asynchronous version of IEnumerable<T>. For that, you can use IObservable<T> from Rx or (my favorite) block from TPL dataflow. With that, your code could look like this (I have also changed some variables to better names):

public IReceivableSourceBlock<Symbol> GetSymbolsAsync() {     var block = new BufferBlock<Symbol>();      GetSymbolsAsyncCore(block).ContinueWith(         task => ((IDataflowBlock)block).Fault(task.Exception),         TaskContinuationOptions.NotOnRanToCompletion);      return block; }  private async Task GetSymbolsAsyncCore(ITargetBlock<Symbol> block) {     // snip      while (historicalFinancialTasks.Count > 0)     {         var historicalFinancialTask =             await Task.WhenAny(historicalFinancialTasks);         historicalFinancialTasks.Remove(historicalFinancialTask);         var historicalFinancial = historicalFinancialTask.Result;          var symbol = new Symbol(             historicalFinancial.Symbol.Identifier,             historicalFinancial.Symbol.HistoricalQuotes,             historicalFinancial.Data);          await block.SendAsync(symbol);     } } 

And usage could be:

var symbols = _service.GetSymbolsAsync(); while (await symbols.OutputAvailableAsync()) {     Symbol symbol;     while (symbols.TryReceive(out symbol))         SymbolsObservableCollection.Add(symbol); } 

Or:

var symbols = _service.GetSymbolsAsync(); var addToCollectionBlock = new ActionBlock<Symbol>(    symbol => SymbolsObservableCollection.Add(symbol)); symbols.LinkTo(    addToCollectionBlock, new DataflowLinkOptions { PropagateCompletion = true }); await symbols.Completion; 
like image 42
svick Avatar answered Sep 24 '22 07:09

svick