Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there an asynchronous version of PLINQ?

I want to execute a query over a stream of data while processing items in parallel with a certain degree of parallelism. Normally, I'd use PLINQ for that, but my work items are not CPU bound but IO bound. I want to use async IO. PLINQ does not support async work.

What's the smartest way of running a PLINQ-style query, but with async work items?


Here's a more detailed illustration of the problem:

My goal is to process a potentially infinite stream of "items" in a way that is logically described by the following query:

var items = new int[10]; //simulate data

var results =
 from x in items.AsParallel().WithDegreeOfParallelism(100)
 where Predicate(x)
 select ComputeSomeValue(x);

foreach (var result in results)
 PerformSomeAction(result);

This query is just a sketch of the real query. Now I want each of the placeholder functions to be asynchronous (returning a Task and internally being based on async IO).

Note, that there might be far more items than can be stored in memory. I also must control the degree of parallelism to max out the underlying network and disk hardware.

This question is not about multi-core. It fully applies to machines with only one CPU core because the IO can still benefit from parallelism. Think of slow web-service calls and the like.

like image 850
usr Avatar asked Jan 16 '14 18:01

usr


2 Answers

This sounds like a job for Microsoft's reactive framework.

I started with this code as my initial variables:

var items = Enumerable.Range(0, 10).ToArray();

Func<int, bool> Predicate = x => x % 2 == 0;

Func<int, int> ComputeSomeValue = x =>
{
    Thread.Sleep(10000);
    return x * 3;
};

Now, I used regular LINQ query as a base-line:

var results =
    from x in items
    where Predicate(x)
    select ComputeSomeValue(x);

This took 50 seconds to compute the following results:

enumerable

Then I switched over to an observable (reactive framework) query:

var results =
    from x in items.ToObservable()
    where Predicate(x)
    from y in Observable.Start(() => ComputeSomeValue(x))
    select y;

This took 10 seconds to get:

observable

It's clearly computing in parallel.

However, the results are out of order. So I changed the query to this:

var query =
    from x in items.ToObservable()
    where Predicate(x)
    from y in Observable.Start(() => ComputeSomeValue(x))
    select new { x, y };

var results =
    query
        .ToEnumerable()
        .OrderBy(z => z.x)
        .Select(z => z.y);

That still took 10 seconds, but I got the results back in the correct order.

Now, the only issue here is the WithDegreeOfParallelism. There's a coupe of things to try here.

First up I changed the code to produce 10,000 values with a 10ms compute time. My standard LINQ query still took 50 seconds. But the reactive query took 6.3 seconds. If it could perform all the computations at the same time it should have taken much less. This shows that it is maxing out the asynchronous pipeline.

The second point is that the reactive framework uses schedulers for all of the work scheduling. You could try the variety of schedulers that come with the reactive framework to find an alternative if the built-in one doeesn't do what you want. Or you could even write your own scheduler to do whatever scheduling you like.


Here's a version of the query that computes the predicate in parallel too.

var results =
    from x in items.ToObservable()
    from p in Observable.Start(() => Predicate(x))
    where p
    from y in Observable.Start(() => ComputeSomeValue(x))
    select new { x, y };
like image 111
Enigmativity Avatar answered Nov 13 '22 08:11

Enigmativity


As stated here, PLINQ is for running LINQ queries in parallel on multi-core/multi-processor systems. It hasn't too much to do about cool systems having a lot of disk units and super networking capabilities. AFAIK, it's made for running executable code on more cores, not for concurrently dispatching multiple I/O requests to the operating system.

Maybe your Predicate(x) is CPU bound, therefore you may perform that filtering operation using PLINQ. But you cannot apply the I/O demanding operations (ComputeSomeValue, PerformSomeAction) in the same way.

What you can do is to define a chain of operations (two in your case) for each item (see continuation tasks) and dispatch that chain (sequentially (?)).

Also, you have mentioned something about an "infinite stream of items". This may sound a bit as the producer-consumer problem - if those items are also I/O generated.

Maybe your problem is not that multi-core friendly... It may be just I/O demanding, that's all...

like image 43
turdus-merula Avatar answered Nov 13 '22 08:11

turdus-merula