Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Read and process files in parallel C#

I have very big files that I have to read and process. Can this be done in parallel using Threading?

Here is a bit of code that I've done. But it doesen't seem to get a shorter execution time the reading and processing the files one after the other.

String[] files = openFileDialog1.FileNames;

Parallel.ForEach(files, f =>
{
    readTraceFile(f);
});        

private void readTraceFile(String file)
{
    StreamReader reader = new StreamReader(file);
    String line;

    while ((line = reader.ReadLine()) != null)
    {
        String pattern = "\\s{4,}";

        foreach (String trace in Regex.Split(line, pattern))
        {
            if (trace != String.Empty)
            {
                String[] details = Regex.Split(trace, "\\s+");

                Instruction instruction = new Instruction(details[0],
                    int.Parse(details[1]),
                    int.Parse(details[2]));
                Console.WriteLine("computing...");
                instructions.Add(instruction);
            }
        }
    }
}
like image 788
luca.p.alexandru Avatar asked Jan 05 '14 00:01

luca.p.alexandru


2 Answers

It looks like your application's performance is mostly limited by IO. However, you still have a bit of CPU-bound work in your code. These two bits of work are interdependent: your CPU-bound work cannot start until the IO has done its job, and the IO does not move on to the next work item until your CPU has finished with the previous one. They're both holding each other up. Therefore, it is possible (explained at the very bottom) that you will see an improvement in throughput if you perform your IO- and CPU-bound work in parallel, like so:

void ReadAndProcessFiles(string[] filePaths)
{
    // Our thread-safe collection used for the handover.
    var lines = new BlockingCollection<string>();

    // Build the pipeline.
    var stage1 = Task.Run(() =>
    {
        try
        {
            foreach (var filePath in filePaths)
            {
                using (var reader = new StreamReader(filePath))
                {
                    string line;

                    while ((line = reader.ReadLine()) != null)
                    {
                        // Hand over to stage 2 and continue reading.
                        lines.Add(line);
                    }
                }
            }
        }
        finally
        {
            lines.CompleteAdding();
        }
    });

    var stage2 = Task.Run(() =>
    {
        // Process lines on a ThreadPool thread
        // as soon as they become available.
        foreach (var line in lines.GetConsumingEnumerable())
        {
            String pattern = "\\s{4,}";

            foreach (String trace in Regex.Split(line, pattern))
            {
                if (trace != String.Empty)
                {
                    String[] details = Regex.Split(trace, "\\s+");

                    Instruction instruction = new Instruction(details[0],
                        int.Parse(details[1]),
                        int.Parse(details[2]));
                    Console.WriteLine("computing...");
                    instructions.Add(instruction);
                }
            }
        }
    });

    // Block until both tasks have completed.
    // This makes this method prone to deadlocking.
    // Consider using 'await Task.WhenAll' instead.
    Task.WaitAll(stage1, stage2);
}

I highly doubt that it's your CPU work holding things up, but if it happens to be the case, you can also parallelise stage 2 like so:

    var stage2 = Task.Run(() =>
    {
        var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount };

        Parallel.ForEach(lines.GetConsumingEnumerable(), parallelOptions, line =>
        {
            String pattern = "\\s{4,}";

            foreach (String trace in Regex.Split(line, pattern))
            {
                if (trace != String.Empty)
                {
                    String[] details = Regex.Split(trace, "\\s+");

                    Instruction instruction = new Instruction(details[0],
                        int.Parse(details[1]),
                        int.Parse(details[2]));
                    Console.WriteLine("computing...");
                    instructions.Add(instruction);
                }
            }
        });
    });

Mind you, if your CPU work component is negligible in comparison to the IO component, you won't see much speed-up. The more even the workload is, the better the pipeline is going to perform in comparison with sequential processing.

Since we're talking about performance note that I am not particularly thrilled about the number of blocking calls in the above code. If I were doing this in my own project, I would have gone the async/await route. I chose not to do so in this case because I wanted to keep things easy to understand and easy to integrate.

like image 102
Kirill Shlenskiy Avatar answered Sep 22 '22 23:09

Kirill Shlenskiy


From the look of what you are trying to do, you are almost certainly I/O bound. Attempting parallel processing in the case will not help and may in fact slow down processing due to addition seek operations on the disk drives (unless you can have the data split over multiple spindles).

like image 40
Gary Walker Avatar answered Sep 20 '22 23:09

Gary Walker