Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Export a large CSV file in parallel to SQL server

I have a large CSV file... 10 columns, 100 million rows, roughly 6 GB in size on my hard disk. I want to read this CSV file line by line and then load the data into a Microsoft SQL server database using SQL bulk copy. I have read couple of threads on here and also on the internet. Most people suggest that reading a CSV file in parallel doesn't buy much in terms of efficiency as the tasks/threads contend for disk access.

What I'm trying to do is, read line by line from CSV and add it to blocking collection of size 100K rows. And once this collection is full spin up a new task/thread to write the data to SQL server using SQLBuckCopy API.

I have written this piece of code, but hitting an error at run time that says "Attempt to invoke bulk copy on an object that has a pending operation." This scenario looks like something that can be easily solved using .NET 4.0 TPL but I'm not able to get it work. Any suggestions on what I'm doing wrong?

    public static void LoadCsvDataInParalleToSqlServer(string fileName, string connectionString, string table, DataColumn[] columns, bool truncate)
    {
        const int inputCollectionBufferSize = 1000000;
        const int bulkInsertBufferCapacity = 100000;
        const int bulkInsertConcurrency = 8;

        var sqlConnection = new SqlConnection(connectionString);
        sqlConnection.Open();

        var sqlBulkCopy = new SqlBulkCopy(sqlConnection.ConnectionString, SqlBulkCopyOptions.TableLock)
        {
            EnableStreaming = true,
            BatchSize = bulkInsertBufferCapacity,
            DestinationTableName = table,
            BulkCopyTimeout = (24 * 60 * 60),
        };

        BlockingCollection<DataRow> rows = new BlockingCollection<DataRow>(inputCollectionBufferSize);
        DataTable dataTable = new DataTable(table);
        dataTable.Columns.AddRange(columns);

        Task loadTask = Task.Factory.StartNew(() =>
            {
                foreach (DataRow row in ReadRows(fileName, dataTable))
                {
                    rows.Add(row);
                }

                rows.CompleteAdding();
            });

        List<Task> insertTasks = new List<Task>(bulkInsertConcurrency);

        for (int i = 0; i < bulkInsertConcurrency; i++)
        {
            insertTasks.Add(Task.Factory.StartNew((x) =>
                {
                    List<DataRow> bulkInsertBuffer = new List<DataRow>(bulkInsertBufferCapacity);

                    foreach (DataRow row in rows.GetConsumingEnumerable())
                    {
                        if (bulkInsertBuffer.Count == bulkInsertBufferCapacity)
                        {
                            SqlBulkCopy bulkCopy = x as SqlBulkCopy;
                            var dataRows = bulkInsertBuffer.ToArray();
                            bulkCopy.WriteToServer(dataRows);
                            Console.WriteLine("Inserted rows " + bulkInsertBuffer.Count);
                            bulkInsertBuffer.Clear();
                        }

                        bulkInsertBuffer.Add(row);
                    }

                },
                sqlBulkCopy));
        }

        loadTask.Wait();
        Task.WaitAll(insertTasks.ToArray());
    }

    private static IEnumerable<DataRow> ReadRows(string fileName, DataTable dataTable)
    {
        using (var textFieldParser = new TextFieldParser(fileName))
        {
            textFieldParser.TextFieldType = FieldType.Delimited;
            textFieldParser.Delimiters = new[] { "," };
            textFieldParser.HasFieldsEnclosedInQuotes = true;

            while (!textFieldParser.EndOfData)
            {
                string[] cols = textFieldParser.ReadFields();

                DataRow row = dataTable.NewRow();

                for (int i = 0; i < cols.Length; i++)
                {
                    if (string.IsNullOrEmpty(cols[i]))
                    {
                        row[i] = DBNull.Value;
                    }
                    else
                    {
                        row[i] = cols[i];
                    }
                }

                yield return row;
            }
        }
    }
like image 698
user330612 Avatar asked Oct 22 '14 04:10

user330612


1 Answers

Don't.

Parallel access may or may not give you faster read of the file (it won't, but I'm not going to fight that battle...) but for certain parallel writes it won't give you faster bulk insert. That is because minimally logged bulk insert (ie. the really fast bulk insert) requires a table lock. See Prerequisites for Minimal Logging in Bulk Import:

Minimal logging requires that the target table meets the following conditions:

...
- Table locking is specified (using TABLOCK).
...

Parallel inserts, by definition, cannot obtain concurrent table locks. QED. You are barking up the wrong tree.

Stop getting your sources from random finding on the internet. Read The Data Loading Performance Guide, is the guide to ... performant data loading.

I would recommend to you stop inventing the wheel. Use an SSIS, this is exactly what is designed to handle.

like image 127
Remus Rusanu Avatar answered Nov 14 '22 23:11

Remus Rusanu