Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Converting a IEnumerable<T> to IObservable<T>, with maximum parallelism

I have a sequence of async tasks to do (say, fetch N web pages). Now what I want is to expose them all as an IObservable<T>. My current solution uses the answer from this question:

async Task<ResultObj> GetPage(string page) {
    Console.WriteLine("Before");
    var result = await FetchFromInternet(page);
    Console.WriteLine("After");
    return result;
}

// pages is an IEnumerable<string>
IObservable<ResultObj> resultObservable =pages.Select(GetPage).
                 Select(t => Observable.FromAsync(() => t)).Merge();

// Now consume the list
foreach(ResultObj obj in resultObservable.ToEnumerable()) {
    Console.WriteLine(obj.ToString());
}

The problem is that I do not know the number of pages to be fetched, and it could be large. I do not want to make hundreds of simultaneous requests. So I want a way to limit the maximum number of tasks that will be executed in parallel. Is there a way to limit the number of concurrent invocations of GetPage?

There is a Merge overload that takes a maxConcurrent parameter, but it does not seem to actually limit the concurrency of the function invokation. THe console prints all the Before messages before the After messages.

Note: I need to convert back to IEnumerable<T>. I'm writing a data source for a system that gives me descriptors of data to fetch, and I need to give it back a list of the downloaded data.

like image 855
felipe Avatar asked Mar 18 '23 16:03

felipe


1 Answers

EDIT

The following should work. This overload limits the number of concurrent subscriptions.

var resultObservable = pages
  .Select(p => Observable.FromAsync(() => GetPage(p)))
  .Merge(maxConcurrent);

Explanation

In order to understand why this change is needed we need some background

  1. FromAsync returns an observable that will invoke the passed Func every time it is subscribed to. This implies that if the observable is never subscribed to, it will never be invoked.

  2. Merge eagerly reads the source sequence, and only subscribes to n observables simultaneously.

With these two pieces we can know why the original version will execute everything in parallel: because of (2), GetPage will have already been invoked for all the source strings by the time Merge decides how many observables need to be subscribed.

And we can also see why the second version works: even though the sequence has been fully iterated over, (1) means that GetPage is not invoked until Merge decides it needs to subscribe to n observables. This leads to the desired result of only n tasks being executed simultaneously.

like image 156
Matthew Finlay Avatar answered Apr 26 '23 12:04

Matthew Finlay