Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pitfalls of trying to use PLINQ over long-running generators?

I have a few infinite generator methods, including some long-running and infinitely-long-running generators.

IEnumerable<T> ExampleOne() { 
    while(true) // this one blocks for a few seconds at a time
        yield return LongRunningFunction();
}
IEnumerable<T> ExampleTwo() { 
    while(true) //this one blocks for a really long time
        yield return OtherLongRunningFunction();
}

My goal is to have an infinite sequence that combines the items from the two examples. Here's what I tried, using PLINQ:

 IEnumerable<T> combined =  new[] { ExampleOne(), ExampleTwo() }
           .AsParallel()
           .WithMergeOptions(ParallelMergeOptions.NotBuffered)
           .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
           .SelectMany(source => source.GetRequests());

This seems appropriately combines the two IEnumerables into a new one, with items from IEnumerable #1 and #2 being available whenever they appear in either of the two source IEnumerables:

//assuming ExampleTwo yields TWO but happens roughly 5 times 
//less often then ExampleOne
Example output:  one one one one one TWO one one one one one one TWO

However, it seems like sometimes (usually after many hours of running) OtherLongRunningFunction() will go for a long period of time without returning, and under conditions that are hard to reproduce, the combined sequence will block on it rather than continuing to return results from the first LongRunningFunction. It seems that although the combined parallel query started off using two threads, it decided to switch to one thread later on.

My first thought was "this is probably a job for RX Observable.Merge and not for PLINQ." But I'd appreciate both answers that show correct alternative ways to handle this situation as well as explanations about the mechanics of how PLINQ can change the degree of parallelism hours after the the start of a query.

like image 887
Jimmy Avatar asked Jan 25 '12 06:01

Jimmy


2 Answers

Here's the Rx way to do it, and indeed, it does use Merge:

IObservable<T> LongRunningFunction()
{
    return Observable.Start(() => {
        // Calculate some stuff
        return blah;
    }, Scheduler.TaskPoolScheduler);
}

Observable.Merge(
    Observable.Defer(LongRunningFunction).Repeat(),
    Observable.Defer(OtherLongRunningFunction).Repeat(),
).Subscribe(x => {
    Console.WriteLine("An item: {0}", x);
});
like image 163
Ana Betts Avatar answered Oct 17 '22 16:10

Ana Betts


If you want the benefits of TPL especially for tasks with varying loads (what happens when your subscribe blocks, and a number of items have been produced - should you stop yielding items?), I recommend TPL DataFlow.

If you want to do it with Rx, for really long running computational tasks, it's best not to block the thread pool:

var stream = Observable.Merge(ExampleTwo().ToObservable(Scheduler.NewThread), ExampleOne().ToObservable(Scheduler.NewThread));

stream.Subscribe(...);
like image 23
Asti Avatar answered Oct 17 '22 15:10

Asti