Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Good approach for hundreds of comsumers and big files [closed]

I have several files (nearly 1GB each) with data. Data is a string line.

I need to process each of these files with several hundreds of consumers. Each of these consumers does some processing that differs from others. Consumers do not write anywhere concurrently. They only need input string. After processing they update their local buffers. Consumers can easily be executed in parallel.

Important: With one specific file each consumer has to process all lines (without skipping) in correct order (as they appear in file). The order of processing different files doesn't matter.

Processing of a single line by one consumer is comparably fast. I expect less than 50 microseconds on Corei5.

So now I'm looking for the good approach to this problem. This is going to be be a part of a .NET project, so please let's stick with .NET only (C# is preferable).

I know about TPL and DataFlow. I guess that the most relevant would be BroadcastBlock. But i think that the problem here is that with each line I'll have to wait for all consumers to finish in order to post the new one. I guess that it would be not very efficient.

I think that ideally situation would be something like this:

  1. One thread reads from file and writes to the buffer.
  2. Each consumer, when it is ready, reads the line from the buffer concurrently and processes it.
  3. The entry from the buffer shouldn't be deleted as one consumer reads it. It can be deleted only when all consumers have processed it.
  4. TPL schedules consumer threads itself.
  5. If one consumer outperforms the others, it shouldn't wait and can read more recent entries from the buffer.

Am i right with this kind of approach? Whether yes or not, how can i implement the good solution?

like image 282
shda Avatar asked May 30 '14 15:05

shda


1 Answers

I don't agree with one thread reads from the files and writes to the buffer
At several files of 1 GB that thread would consume too much memory
.NET has an object size limit and a collection is one object

You are going to need to throttle reading lines
I think you could do that with a BlockingCollection The 1000000 of the bc deals with keeping the slowest consumer busy
And it also give some buffer for opening the next file

using System.IO;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace BlockingCollection2
{
    /// <summary>
    /// Interaction logic for MainWindow.xaml
    /// </summary>
    public partial class MainWindow : Window
    {
        public MainWindow()
        {
            InitializeComponent();
        }
        public static void BC_GetConsumingEnumerableCollection()
        {
            List<string> fileNames = new List<string>();  // add filesNames
            string producerLine;
            System.IO.StreamReader file;
            List<BCtaskBC> bcs = new List<BCtaskBC>();  // add for each consumer
            // Kick off a producer task
            Task.Factory.StartNew(() =>
            {
                foreach(string fileName in fileNames)
                {
                    file = new System.IO.StreamReader(fileName);
                    while ((producerLine = file.ReadLine()) != null)
                    {
                        foreach (BCtaskBC bc in bcs)
                        {
                            // string is reference type but it often acts like a value type
                            // may need to make a deep copy of producerLine for this next line
                            bc.BC.Add(producerLine);  // if  any queue size gets to 1000000 then this blocks
                        }
                    }
                    file.Close();
                }                 
                // Need to do this to keep foreach below from hanging
                foreach (BCtaskBC bc in bcs)
                {
                    bc.BC.CompleteAdding();
                }
            });

            // Now consume the blocking collection with foreach. 
            // Use bc.GetConsumingEnumerable() instead of just bc because the 
            // former will block waiting for completion and the latter will 
            // simply take a snapshot of the current state of the underlying collection. 
            //  Method signature: Parallel.ForEach(IEnumerable<TSource> source, Action<TSource> body)
            Parallel.ForEach(bcs, bc =>
            {
                foreach (string consumerLine in bc.BC.GetConsumingEnumerable())
                {
                    bc.BCtask.ProcessTask(consumerLine);  
                }
            } //close lambda expression
                 ); //close method invocation 
            // I think this need to be parallel
            //foreach (BCtaskBC bc in bcs)
            //{
            //    foreach (string consumerLine in bc.BC.GetConsumingEnumerable())
            //    {
            //        bc.BCtask.ProcessTask(consumerLine);
            //    }
            //}
        }
        public abstract class BCtaskBC
        {   // may need to do something to make this thread safe   
            private BlockingCollection<string> bc = new BlockingCollection<string>(1000000);  // this trotttles the size
            public BCtask BCtask { get; set; }
            public BlockingCollection<string> BC { get { return bc; } }
        }
        public abstract class BCtask
        {   // may need to do something to make this thread safe 
            public void ProcessTask(string S) {}
        }
    }
}
like image 182
paparazzo Avatar answered Oct 31 '22 11:10

paparazzo