I'm trying to use the Reactive Extensions (Rx) to buffer an enumeration of Tasks as they complete. Does anyone know if there is a clean built-in way of doing this? The ToObservable
extension method will just make an IObservable<Task<T>>
, which is not what I want, I want an IObservable<T>
, that I can then use Buffer
on.
Contrived example:
//Method designed to be awaitable
public static Task<int> makeInt()
{
return Task.Run(() => 5);
}
//In practice, however, I don't want to await each individual task
//I want to await chunks of them at a time, which *should* be easy with Observable.Buffer
public static void Main()
{
//Make a bunch of tasks
IEnumerable<Task<int>> futureInts = Enumerable.Range(1, 100).Select(t => makeInt());
//Is there a built in way to turn this into an Observable that I can then buffer?
IObservable<int> buffered = futureInts.TasksToObservable().Buffer(15); //????
buffered.Subscribe(ints => {
Console.WriteLine(ints.Count()); //Should be 15
});
}
You can use the fact that Task
can be converted to observable using another overload of ToObservable()
.
When you have a collection of (single-item) observables, you can create a single observable that contains the items as they complete using Merge()
.
So, your code could look like this:
futureInts.Select(t => t.ToObservable())
.Merge()
.Buffer(15)
.Subscribe(ints => Console.WriteLine(ints.Count));
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With