Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

IEnumerable<T>, Parallel.ForEach and Memory Management

I am reading and processing very large amounts of Sql Server data (10’s of millions+ of rows in, 100’s of millions+ rows out). The processing performed on each source row is significant. A single threaded version is not performing to expectations. My current parallel processing version is performing very well on some smaller batches (300,000 source rows, 1M output rows), but I am running into some Out Of Memory exceptions for very large runs.

The code was significantly inspired by the answers provided here: Is there a way to use the Task Parallel Library(TPL) with SQLDataReader?

Here is the general idea:

Get the source data (data is too large to read into memory, so we will “stream” it)

public static IEnumerable<MyObject> ReadData()
{
    using (SqlConnection con = new SqlConnection(Settings.ConnectionString)) 
       using (SqlCommand cmd = new SqlCommand(selectionSql, con))
       {
            con.Open();
            using (SqlDataReader dr = cmd.ExecuteReader(CommandBehavior.CloseConnection))
            {
            while (dr.Read())
            {
                // make some decisions here – 1 to n source rows are used
                // to create an instance of MyObject
                yield return new MyObject(some parameters);
            }
        }
    }
}

Once we get to the point of parallel processing, we want to use the SqlBulkCopy object to write the data. Because of this, we don’t want to process individual MyObjects in parallel as we want to perform a bulk copy per thread. Because of this, we’ll read from above with another IEnumerable that returns a “batch” of MyObjects

class MyObjectBatch 
{
    public List<MyObject> Items { get; set; }

    public MyObjectBatch (List<MyObject> items)
    {
        this.Items = items;
    }

    public static IEnumerable<MyObjectBatch> Read(int batchSize)
    {
        List<MyObject> items = new List<MyObjectBatch>();
        foreach (MyObject o in DataAccessLayer.ReadData())
        {
            items.Add(o);
            if (items.Count >= batchSize)
            {
                yield return new MyObjectBatch(items);                    
                items = new List<MyObject>(); // reset
            }
        }
        if (items.Count > 0) yield return new MyObjectBatch(items);            
    }
}

Finally, we get to the point of parallel processing the “batches”

ObjectProcessor processor = new ObjectProcessor();

ParallelOptions options = new ParallelOptions { MaxDegreeOfParallelism = Settings.MaxThreads };
Parallel.ForEach(MyObjectBatch.Read(Settings.BatchSize), options, batch =>
{
    // Create a container for data processed by this thread
    // the container implements IDataReader
    ProcessedData targetData = new ProcessedData(some params));

    // process the batch… for each MyObject in MyObjectBatch – 
    // results are collected in targetData
    for (int index = 0; index < batch.Items.Count; index++) 
    {
        processor.Process(batch.Item[index], targetData);
    }

    // bulk copy the data – this creates a SqlBulkCopy instance
    // and loads the data to the target table
    DataAccessLayer.BulkCopyData(targetData);

    // explicitly set the batch and targetData to null to try to free resources

});

Everything above has been significantly simplified, but I believe it includes all of the important concepts. Here is the behavior I am seeing:

Performance is very good – for reasonable sized data sets, I am getting very good results.

However, as it processes, the memory consumed continues to grow. For larger data sets, this leads to exceptions.

I have proved through logging, that if I slow down the reads from the database, it slows down the batch reads and subsequently, the parallel threads being created (especially if I set the MaxDegreeOfParallelization). I was concerned that I was reading faster than I could process, but if I limit the threads, it should only read what each thread can handle.

Smaller or larger batch sizes have some effect on performance, but the amount of memory used grows consistently with the size of the batch.

Where is there an opportunity to recover some memory here? As my “batches” go out of scope, should that memory be recovered? Is there something I could be doing at the first two layers that would help free some resources?

To answer some questions: 1. Could it be done purely in SQL - no, the processing logic is very complex (and dynamic). Generally speaking, it is doing low-level binary decoding. 2. We have tried SSIS (with some success). The issue is that the definition of the source data as well as the output is very dynamic. SSIS seems to require very strict input and output column definition which won't work in this case.

Someone also asked about the ProcessedData object - this is actually fairly simple:

class ProcessedData : IDataReader 
{
    private int _currentIndex = -1;
    private string[] _fieldNames { get; set; }

    public string TechnicalTableName { get; set; }        
    public List<object[]> Values { get; set; }

    public ProcessedData(string schemaName, string tableName, string[] fieldNames)
    {            
        this.TechnicalTableName = "[" + schemaName + "].[" + tableName + "]";
        _fieldNames = fieldNames;            
        this.Values = new List<object[]>();
    }

    #region IDataReader Implementation

    public int FieldCount
    {
        get { return _fieldNames.Length; }
    }

    public string GetName(int i)
    {
        return _fieldNames[i];
    }

    public int GetOrdinal(string name)
    {
        int index = -1;
        for (int i = 0; i < _fieldNames.Length; i++)
        {
            if (_fieldNames[i] == name)
            {
                index = i;
                break;
            }
        }
        return index;
    }

    public object GetValue(int i)
    {
        if (i > (Values[_currentIndex].Length- 1))
        {
            return null;
        }
        else
        {
            return Values[_currentIndex][i];
        }
    }

    public bool Read()
    {
        if ((_currentIndex + 1) < Values.Count)
        {
            _currentIndex++;
            return true;
        }
        else
        {
            return false;
        }
    }

    // Other IDataReader things not used by SqlBulkCopy not implemented
}

UPDATE and CONCLUSION:

I received a great deal of valuable input, but wanted to summarize it all into a single conclusion. First, my main question was if there was anything else I could do (with the code I posted) to aggressively reclaim memory. The consensus seems to be that the approach is correct, but that my particular problem is not entirely bound by CPU, so a simple Parallel.ForEach will not manage the processing correctly.

Thanks to usr for his debugging suggestion and his very interesting PLINQ suggestion. Thanks to zmbq for helping clarify what was and wasn't happening.

Finally, anyone else who may be chasing a similar issue will likely find the following discussions helpful:

Parallel.ForEach can cause a "Out Of Memory" exception if working with a enumerable with a large object

Parallel Operation Batching

like image 609
snow_FFFFFF Avatar asked Mar 08 '14 18:03

snow_FFFFFF


2 Answers

I do not fully understand how Parallel.ForEach is pulling items, but I think by default it pulls more than one to save locking overhead. This means that multiple items might be queued internally inside of Parallel.ForEach. This might cause OOM quickly because your items are very big individually.

You could try giving it a Partitioner that returns single items.

If that does not help, we need to dig deeper. Debugging memory issues with Parallel and PLINQ is nasty. There was in bug in one of those, for example, that caused old items not to be released quickly.

As a workaround, you could clear the list after processing. That will at least allow all items to be reclaimed deterministically after processing has been done.

Regarding the code you posted: It is clean, of high quality and you are adhering to high standards of resource management. I would not suspect a gross memory or resource leak on your part. It is still not impossible. You can test this by commenting out the code inside of the Parallel.ForEach and replacing it with a Thread.Sleep(1000 * 60). If the leak persists, you are not at fault.

In my experience, PLINQ is easier to get an exact degree of parallelism with (because the current version uses the exact DOP you specify, never less never more). Like this:

GetRows()
.AsBatches(10000)    
.AsParallel().WithDegreeOfParallelism(8)
.Select(TransformItems) //generate rows to write
.AsEnumerable() //leave PLINQ
.SelectMany(x => x) //flatten batches
.AsBatches(1000000) //create new batches with different size
.AsParallel().WithDegreeOfParallelism(2) //PLINQ with different DOP
.ForEach(WriteBatchToDB); //write to DB

This would give you a simple pipeline that pulls from the DB, does CPU-bound work with a specific DOP optimized for the CPU, and writes to the database with much bigger batches and less DOP.

This is quite simple and it should max out CPUs and disks independently with their respective DOP. Play with the DOP numbers.

like image 102
usr Avatar answered Nov 13 '22 05:11

usr


You're keeping two things in memory - your input data and your output data. You've tried to read and process that data in parallel, but you're not reducing the overall memory footprint - you still end up keeping most the data in memory - the more threads you have, the more data you keep in memory.

I guess most of the memory is taken by your output data, as you create 10 times more output records than input records. So you have a few (10? 30? 50) SqlBulkCopy operations.

That is actually too much. You can gain a lot of speed by writing 100,000 records in bulk. What you should do is split your work - read 10,000-20,000 records, create the output records, SqlBulkCopy to the database, and repeat. Your memory consumption will drop considerably.

You can, of course, do that in parallel - handle several 10,000 record batches in parallel.

Just keep in mind that Parallel.ForEach and the thread-pool in general is meant to optimize CPU usage. Chances are what limits you is I/O on the database server. While databases can handle concurrency quite well, their limit doesn't depend on the number of cores on your client machine, so you'd better play with the number of concurrent threads and see what's fastest.

like image 1
zmbq Avatar answered Nov 13 '22 06:11

zmbq