I have several files (nearly 1GB each) with data. Data is a string line.
I need to process each of these files with several hundreds of consumers. Each of these consumers does some processing that differs from others. Consumers do not write anywhere concurrently. They only need input string. After processing they update their local buffers. Consumers can easily be executed in parallel.
Important: With one specific file each consumer has to process all lines (without skipping) in correct order (as they appear in file). The order of processing different files doesn't matter.
Processing of a single line by one consumer is comparably fast. I expect less than 50 microseconds on Corei5.
So now I'm looking for the good approach to this problem. This is going to be be a part of a .NET project, so please let's stick with .NET only (C# is preferable).
I know about TPL and DataFlow. I guess that the most relevant would be BroadcastBlock
. But i think that the problem here is that with each line I'll have to wait for all consumers to finish in order to post the new one. I guess that it would be not very efficient.
I think that ideally situation would be something like this:
Am i right with this kind of approach? Whether yes or not, how can i implement the good solution?
I don't agree with one thread reads from the files and writes to the buffer
At several files of 1 GB that thread would consume too much memory
.NET has an object size limit and a collection is one object
You are going to need to throttle reading lines
I think you could do that with a BlockingCollection
The 1000000 of the bc deals with keeping the slowest consumer busy
And it also give some buffer for opening the next file
using System.IO;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace BlockingCollection2
{
/// <summary>
/// Interaction logic for MainWindow.xaml
/// </summary>
public partial class MainWindow : Window
{
public MainWindow()
{
InitializeComponent();
}
public static void BC_GetConsumingEnumerableCollection()
{
List<string> fileNames = new List<string>(); // add filesNames
string producerLine;
System.IO.StreamReader file;
List<BCtaskBC> bcs = new List<BCtaskBC>(); // add for each consumer
// Kick off a producer task
Task.Factory.StartNew(() =>
{
foreach(string fileName in fileNames)
{
file = new System.IO.StreamReader(fileName);
while ((producerLine = file.ReadLine()) != null)
{
foreach (BCtaskBC bc in bcs)
{
// string is reference type but it often acts like a value type
// may need to make a deep copy of producerLine for this next line
bc.BC.Add(producerLine); // if any queue size gets to 1000000 then this blocks
}
}
file.Close();
}
// Need to do this to keep foreach below from hanging
foreach (BCtaskBC bc in bcs)
{
bc.BC.CompleteAdding();
}
});
// Now consume the blocking collection with foreach.
// Use bc.GetConsumingEnumerable() instead of just bc because the
// former will block waiting for completion and the latter will
// simply take a snapshot of the current state of the underlying collection.
// Method signature: Parallel.ForEach(IEnumerable<TSource> source, Action<TSource> body)
Parallel.ForEach(bcs, bc =>
{
foreach (string consumerLine in bc.BC.GetConsumingEnumerable())
{
bc.BCtask.ProcessTask(consumerLine);
}
} //close lambda expression
); //close method invocation
// I think this need to be parallel
//foreach (BCtaskBC bc in bcs)
//{
// foreach (string consumerLine in bc.BC.GetConsumingEnumerable())
// {
// bc.BCtask.ProcessTask(consumerLine);
// }
//}
}
public abstract class BCtaskBC
{ // may need to do something to make this thread safe
private BlockingCollection<string> bc = new BlockingCollection<string>(1000000); // this trotttles the size
public BCtask BCtask { get; set; }
public BlockingCollection<string> BC { get { return bc; } }
}
public abstract class BCtask
{ // may need to do something to make this thread safe
public void ProcessTask(string S) {}
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With