Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I write a thread-aware extension function for PLINQ?

Tags:

c#

plinq

Does anybody know how to write an extension function returning a ParallelQuery in PLINQ?

More specifically, I have the following problem: I want to perform a transformation within a PLINQ query that needs an engine, whose creation is costly and which cannot be accessed concurrently.

I could do the following:

var result = source.AsParallel ().Select ( (i) => { var e = new Engine (); return e.Process(i); } )

Here, the engine is created once per item, which is too expensive.

I want the engine to be created once per thread.

With Aggregate, I can come close to what I want with something like

// helper class: engine to use plus list of results obtained in thread so far
class EngineAndResults {
   public Engine engine = null;
   public IEnumerable<ResultType> results;
}

var result = source.AsParallel ().Aggregate (

   // done once per block of items (=thread),
   // returning an empty list, but a new engine
   () => new EngineAndList () {
       engine = new Engine (),
       results = Enumerable.Empty<ResultType> ()
   },

   // we process a new item and put it to the thread-local list,
   // preserving the engine for further use
   (engineAndResults, item) => new EngineAndResults () {
       engine = engineAndResults.engine,
       results = Enumerable.Concat (
           engineAndResults.results,
           new ResultType [] { engineAndResults.engine.Process (item) }
       )
   },

   // tell linq how to aggregate across threads
   (engineAndResults1, engineAndResults2) => new EngineAndResults () {
       engine = engineAndResults1.engine,
       results = Enumerable.Concat (engineAndResults1.results, engineAndResults2.results)
   },

   // after all aggregations, how do we come to the result?
   engineAndResults => engineAndResults.results
);

As you can see, I misuse the accumulator to carry an engine per thread. The problem here is that PLINQ in the end aggregates the results into a single IEnumerable, which causes the threads to be synchronized. This is not very nice if I want to append another PLINQ extension afterwards.

I would appreciate something like

   var result = source.AsParallel ()
                  .SelectWithThreadwiseInitWhichIAmLookingFor (
                       () => new Engine (),
                       (engine, item) => engine.Process (item)
              )

Does anybody have any idea how to achieve this?

like image 674
JohnB Avatar asked Jun 22 '12 13:06

JohnB


1 Answers

You could use ThreadLocal<T> to do this. Something like:

var engine = new ThreadLocal<Engine>(() => new Engine());
var result = source.AsParallel()
                   .Select(item => engine.Value.Process(item));
like image 164
svick Avatar answered Nov 06 '22 00:11

svick