Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

how to use yield to return the collection of Item in parallel block or Task

I looking for help on how to make use of yield keyword to return IEnumberable in parallel blocks or Task block. Below is the pseudo code

public IEnumerable<List<T>> ReadFile( )
{
    foreach (string filepath in lstOfFiles)
    {
        var stream = new FileStream(filepath , FileMode.Open, FileAccess.Read);
        foreach (var item in ReadStream(stream))
            yield return item; //where item is of type List<string>
    }
}

I want to convert above code to parallel block like below

lstOfFiles.AsParallel()
          .ForAll(filepath =>
{
    var stream = new FileStream(filepath , FileMode.Open, FileAccess.Read);
    foreach (var item in ReadStream(Stream))
        yield return item;
});

but compiler throws error that Yield cannot be used in Parallel blocks or anonymous delegate. I tried with Task block also, yield is not allowed in task anonymous delegate

Any one suggest me simple and best way to have yield to return collection of data in parallel blocks or task.

I read that RX 2.0 or TPL are good to use in the above scenario. I have a doubt whether to make use of RX or TPL library for asynchronous return of yield of values. Can any one suggest me which is better either Rx or TPL.

If i use of Rx, is it necessary to create subscribe and convert parallel block AsObservable.

like image 223
user145610 Avatar asked Nov 09 '22 21:11

user145610


1 Answers

To use Rx, you'll have to use IObservable<T> instead of IEnumerable<T>.

public IObservable<T> ReadFiles()
{
  return from filepath in lstOfFiles.ToObservable()
         from item in Observable.Using(() => File.OpenRead(filepath), ReadStream)
         select item;
}

Each time that you call Subscribe on the observable returned by ReadFiles, it will iterate over all of the strings in lstOfFiles and, in parallel*, read each file stream.

Sequentially, the query opens each file stream and passes it to ReadStream, which is responsible for generating the asynchronous sequence of items for a given stream.

The ReadFiles query, which uses the SelectMany operator written in query comprehension syntax, merges each "item" that is generated by all ReadStream observables into a single observable sequence, respecting the asynchrony of the source.

You should strongly consider writing an async iterator for your ReadStream method as I've shown here; otherwise, if you must return IEnumerable<T>, then you'll have to convert it by applying the ToObservable(scheduler) operator with a concurrency-introducing scheduler, which may be less efficient.

public IObservable<Item> ReadStream(Stream stream)
{
  return Observable.Create<Item>(async (observer, cancel) =>
  {
    // Here's one example of reading a stream with fixed item lengths.

    var buffer = new byte[itemLength]; // TODO: Define itemLength
    var remainder = itemLength;
    int read;

    do
    {
      read = await stream.ReadAsync(buffer, itemLength - remainder, remainder, cancel)
                         .ConfigureAwait(false);

      remainder -= read;

      if (read == 0)
      {
        if (remainder < itemLength)
        {
          throw new InvalidOperationException("End of stream unexpected.");
        }
        else
        {
          break;
        }
      }
      else if (remainder == 0)
      {
        observer.OnNext(ReadItem(buffer));  // TODO: Define ReadItem

        remainder = itemLength;
      }
    }
    while (true);
  });
}

* Rx does not introduce any concurrency here. Parallelization is simply a result of the asynchronous nature of the underlying API, so it's very efficient. Reading from a file stream asynchronously may cause Windows to use an I/O completion port as an optimization, notifying on a pooled thread when each buffer becomes available. This ensure that Windows is entirely responsible for scheduling callbacks to your application, rather than the TPL or yourself.

Rx is free-threaded, so every notification to your observer may be on a different pooled thread; however, due to Rx's serialization contract (§4.2 Rx Design Guidelines), you will not receive overlapping notifications in your observer when you call Subscribe, so there's no need to provide explicit synchronization, such as locking.

However, due to the parallelized nature of this query, you may observe alternating notifications with respect to each file, but never overlapping notifications.

If you'd rather receive all items for a given file at once, as you've hinted at in your question, then you can simply apply the ToList operator to the query and change the return type:

public IObservable<IList<T>> ReadFiles()
{
  return from filepath in lstOfFiles.ToObservable()
         from items in Observable.Using(() => File.OpenRead(filepath), ReadStream)
                                 .ToList()
         select items;
}

If you need to observe notifications with thread affinity (on a GUI thread, for instance), then you must marshal the notifications because they will be arriving on a pooled thread. Since this query does not introduce concurrency itself, the best way to achieve this is to apply the ObserveOnDispatcher operator (WPF, Store Apps, Phone, Silverlight) or the ObserveOn(SynchronizationContext) overload (WinForms, ASP.NET, etc.). Just don't forget to add a reference to the appropriate platform-specific NuGet package; e.g., Rx-Wpf, Rx-WinForms, Rx-WindowsStore, etc.

You may be tempted to convert the observable back into an IEnumerable<T> instead of calling Subscribe. Do not do this. In most cases it's unnecessary, it can be inefficient and in the worst case it could potentially cause dead locks. Once you enter the world of asynchrony, you should try to stay in it. This is not just true for Rx but also for async/await.

like image 158
Dave Sexton Avatar answered Nov 14 '22 22:11

Dave Sexton