Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Dumping multithreaded accessed ConcurrentBag to File not fast enough

I have built this code to process string comparison between large number of strings in parallel to go faster.

I've used a ConcurrentBag so all the threads (tasks) can write to a thread safe collection. Then I dump this collection to a file.

The issue I have is that the ConcurrentBag<string> log that I dump to a file is filled faster than it can write to the file. So my program consumes more and more ram continuously until it runs out of memory.

My question is what can I do ? Improve the writing to the log ? Pause the tasks until the ConcurrentBag is dumped then resume the tasks ? What would be the fastest option ?

Here is the code :

CsvWriter csv = new CsvWriter(@"C:\test.csv");

List<Bailleur> bailleurs = DataLoader.LoadBailleurs();
ConcurrentBag<string> log = new ConcurrentBag<string>();
int i = 0;

var taskWriteToLog = new Task(() =>
{
    // Consume the items in the bag
    string item;
    while (true)  //  (!log.IsEmpty)
    {
        if (!log.IsEmpty)
        {
            if (log.TryTake(out item))
            {
                csv.WriteLine(item);
            }
            else
                Console.WriteLine("Concurrent Bag busy");
        }
        else
        {
            System.Threading.Thread.Sleep(1000);
        }
    }
});

taskWriteToLog.Start();

Parallel.ForEach(bailleurs, s1 =>
{
    foreach (Bailleur s2 in bailleurs)
    {
        var lcs2 = LongestCommonSubsequenceExtensions.LongestCommonSubsequence(s1.Name, s2.Name);
        string line = String.Format("\"LCS\",\"{0}\",\"{1}\",\"{2}\"", s1.Name, s2.Name, lcs2.Item2);
        log.Add(line);
        // Console.WriteLine(line);

        var dic = DiceCoefficientExtensions.DiceCoefficient(s1.Name, s2.Name);
        line = String.Format("\"DICE\",\"{0}\",\"{1}\",\"{2}\"", s1.Name, s2.Name, dic);
        log.Add(line);
        // Console.WriteLine(line);
    }
    i++;
    Console.WriteLine(i);
});

public class CsvWriter
{
    public string FilePath { get; set; }
    private FileStream _fs { get; set; }
    private StreamWriter _sw { get; set; }

    public CsvWriter2(string filePath)
    {
        FilePath = filePath;
        _fs = new FileStream(FilePath, FileMode.Create, FileAccess.Write);
        _sw = new StreamWriter(_fs);
    }

    public void WriteLine(string line)
    {
        _sw.WriteLine(line);
    }
}
like image 256
Arno 2501 Avatar asked Dec 25 '22 22:12

Arno 2501


1 Answers

Don't use the concurrent bag directly, use a BlockingCollection that has the concurrent bag as the backing store (by default it is a concurrent queue).

One of the constructor overloads lets you set a upper limit on the size of the collection, if the bag gets full it will block the inserting thread untill there is room to insert.

It also gives you the GetConsumingEnumerable() that makes it very easy to take items out of the bag, you just use that in a foreach loop and it will keep giving your consumer data till CompleteAdding is called. After that it runs till the bag is empty then exits like any other normal IEnumerable that has completed. If the bag "goes dry" before CompleteAdding is called it will block the thread and automatically restart when more data is put in to the bag.

void ProcessLog()
{
    CsvWriter csv = new CsvWriter(@"C:\test.csv");

    List<Bailleur> bailleurs = DataLoader.LoadBailleurs();

    const int MAX_BAG_SIZE = 500;
    BlockingCollection<string> log = new BlockingCollection<string>(new ConcurrentBag<string>(), MAX_BAG_SIZE);

    int i = 0;

    var taskWriteToLog = new Task(() =>
    {
        // Consume the items in the bag, no need for sleeps or poleing, When items are available it runs, when the bag is empty but CompletedAdding has not been called it blocks.
        foreach(string item in log.GetConsumingEnumerable())
        {
            csv.WriteLine(item);
        }
    });

    taskWriteToLog.Start();

    Parallel.ForEach(bailleurs, s1 =>
    {
        //Snip... You can switch to BlockingCollection without any changes to this section of code.
    });

    log.CompleteAdding(); //lets anyone using GetConsumingEnumerable know that no new items are comming so they can leave the foreach loops when the bag becomes empty.
}
like image 98
Scott Chamberlain Avatar answered Feb 02 '23 00:02

Scott Chamberlain