Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Making PLINQ and BlockingCollection work together

Tags:

c#

delay

plinq

I have put together a simple application that monitors file creation events, creates some objects from the files content, and does some processing. Here is the sample code:

class Program
{
    private const string Folder = "C:\\Temp\\InputData";

    static void Main(string[] args)
    {
        var cts = new CancellationTokenSource();
        foreach (var obj in Input(cts.Token))
            Console.WriteLine(obj);
    }

    public static IEnumerable<object> Input(CancellationToken cancellationToken)
    {
        var fileList = new BlockingCollection<string>();

        var watcher = new FileSystemWatcher(Folder);
        watcher.Created += (source, e) =>
        {
            if (cancellationToken.IsCancellationRequested)
                watcher.EnableRaisingEvents = false;
            else if (Path.GetFileName(e.FullPath) == "STOP")
            {
                watcher.EnableRaisingEvents = false;
                fileList.CompleteAdding();
                File.Delete(e.FullPath);
            }
            else
                fileList.Add(e.FullPath);
        };
        watcher.EnableRaisingEvents = true;

        return from file in
                   fileList.GetConsumingEnumerable(cancellationToken)
               //.AsParallel()
               //.WithCancellation(cancellationToken)
               //.WithDegreeOfParallelism(5)
               let obj = CreateMyObject(file)
               select obj;
    }

    private static object CreateMyObject(string file)
    {
        return file;
    }
}

It all works fine, but when I uncomment AsParallel (and the next two lines) it doesn't yield results right away. This delay is probably caused by PLINQ partitioning? However, I expect this query to yield items as soon as they are added to the BlockingCollection. Is this possible to achieve using PLINQ?

like image 546
Yuriy Magurdumov Avatar asked Nov 04 '22 13:11

Yuriy Magurdumov


1 Answers

That is what .WithMergeOptions(ParallelMergeOptions.NotBuffered) should be designed for.

like image 162
springy76 Avatar answered Nov 15 '22 07:11

springy76