I have a few infinite generator methods, including some long-running and infinitely-long-running generators.
IEnumerable<T> ExampleOne() {
while(true) // this one blocks for a few seconds at a time
yield return LongRunningFunction();
}
IEnumerable<T> ExampleTwo() {
while(true) //this one blocks for a really long time
yield return OtherLongRunningFunction();
}
My goal is to have an infinite sequence that combines the items from the two examples. Here's what I tried, using PLINQ:
IEnumerable<T> combined = new[] { ExampleOne(), ExampleTwo() }
.AsParallel()
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.SelectMany(source => source.GetRequests());
This seems appropriately combines the two IEnumerables into a new one, with items from IEnumerable
#1 and #2 being available whenever they appear in either of the two source IEnumerables
:
//assuming ExampleTwo yields TWO but happens roughly 5 times
//less often then ExampleOne
Example output: one one one one one TWO one one one one one one TWO
However, it seems like sometimes (usually after many hours of running) OtherLongRunningFunction()
will go for a long period of time without returning, and under conditions that are hard to reproduce, the combined
sequence will block on it rather than continuing to return results from the first LongRunningFunction
. It seems that although the combined parallel query started off using two threads, it decided to switch to one thread later on.
My first thought was "this is probably a job for RX Observable.Merge
and not for PLINQ." But I'd appreciate both answers that show correct alternative ways to handle this situation as well as explanations about the mechanics of how PLINQ can change the degree of parallelism hours after the the start of a query.
Here's the Rx way to do it, and indeed, it does use Merge
:
IObservable<T> LongRunningFunction()
{
return Observable.Start(() => {
// Calculate some stuff
return blah;
}, Scheduler.TaskPoolScheduler);
}
Observable.Merge(
Observable.Defer(LongRunningFunction).Repeat(),
Observable.Defer(OtherLongRunningFunction).Repeat(),
).Subscribe(x => {
Console.WriteLine("An item: {0}", x);
});
If you want the benefits of TPL especially for tasks with varying loads (what happens when your subscribe blocks, and a number of items have been produced - should you stop yielding items?), I recommend TPL DataFlow.
If you want to do it with Rx, for really long running computational tasks, it's best not to block the thread pool:
var stream = Observable.Merge(ExampleTwo().ToObservable(Scheduler.NewThread), ExampleOne().ToObservable(Scheduler.NewThread));
stream.Subscribe(...);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With