I have built this code to process string comparison between large number of strings in parallel to go faster.
I've used a ConcurrentBag so all the threads (tasks) can write to a thread safe collection. Then I dump this collection to a file.
The issue I have is that the ConcurrentBag<string> log
that I dump to a file is filled faster than it can write to the file. So my program consumes more and more ram continuously until it runs out of memory.
My question is what can I do ? Improve the writing to the log ? Pause the tasks until the ConcurrentBag is dumped then resume the tasks ? What would be the fastest option ?
Here is the code :
CsvWriter csv = new CsvWriter(@"C:\test.csv");
List<Bailleur> bailleurs = DataLoader.LoadBailleurs();
ConcurrentBag<string> log = new ConcurrentBag<string>();
int i = 0;
var taskWriteToLog = new Task(() =>
{
// Consume the items in the bag
string item;
while (true) // (!log.IsEmpty)
{
if (!log.IsEmpty)
{
if (log.TryTake(out item))
{
csv.WriteLine(item);
}
else
Console.WriteLine("Concurrent Bag busy");
}
else
{
System.Threading.Thread.Sleep(1000);
}
}
});
taskWriteToLog.Start();
Parallel.ForEach(bailleurs, s1 =>
{
foreach (Bailleur s2 in bailleurs)
{
var lcs2 = LongestCommonSubsequenceExtensions.LongestCommonSubsequence(s1.Name, s2.Name);
string line = String.Format("\"LCS\",\"{0}\",\"{1}\",\"{2}\"", s1.Name, s2.Name, lcs2.Item2);
log.Add(line);
// Console.WriteLine(line);
var dic = DiceCoefficientExtensions.DiceCoefficient(s1.Name, s2.Name);
line = String.Format("\"DICE\",\"{0}\",\"{1}\",\"{2}\"", s1.Name, s2.Name, dic);
log.Add(line);
// Console.WriteLine(line);
}
i++;
Console.WriteLine(i);
});
public class CsvWriter
{
public string FilePath { get; set; }
private FileStream _fs { get; set; }
private StreamWriter _sw { get; set; }
public CsvWriter2(string filePath)
{
FilePath = filePath;
_fs = new FileStream(FilePath, FileMode.Create, FileAccess.Write);
_sw = new StreamWriter(_fs);
}
public void WriteLine(string line)
{
_sw.WriteLine(line);
}
}
Don't use the concurrent bag directly, use a BlockingCollection that has the concurrent bag as the backing store (by default it is a concurrent queue).
One of the constructor overloads lets you set a upper limit on the size of the collection, if the bag gets full it will block the inserting thread untill there is room to insert.
It also gives you the GetConsumingEnumerable() that makes it very easy to take items out of the bag, you just use that in a foreach loop and it will keep giving your consumer data till CompleteAdding is called. After that it runs till the bag is empty then exits like any other normal IEnumerable
that has completed. If the bag "goes dry" before CompleteAdding is called it will block the thread and automatically restart when more data is put in to the bag.
void ProcessLog()
{
CsvWriter csv = new CsvWriter(@"C:\test.csv");
List<Bailleur> bailleurs = DataLoader.LoadBailleurs();
const int MAX_BAG_SIZE = 500;
BlockingCollection<string> log = new BlockingCollection<string>(new ConcurrentBag<string>(), MAX_BAG_SIZE);
int i = 0;
var taskWriteToLog = new Task(() =>
{
// Consume the items in the bag, no need for sleeps or poleing, When items are available it runs, when the bag is empty but CompletedAdding has not been called it blocks.
foreach(string item in log.GetConsumingEnumerable())
{
csv.WriteLine(item);
}
});
taskWriteToLog.Start();
Parallel.ForEach(bailleurs, s1 =>
{
//Snip... You can switch to BlockingCollection without any changes to this section of code.
});
log.CompleteAdding(); //lets anyone using GetConsumingEnumerable know that no new items are comming so they can leave the foreach loops when the bag becomes empty.
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With