Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

C# fastest way to insert data into SQL database

I am receiving (streamed) data from an external source (over Lightstreamer) into my C# application. My C# application receives data from the listener. The data from the listener are stored in a queue (ConcurrentQueue). The queue is getting cleaned every 0.5 seconds with TryDequeue into a DataTable. The DataTable will then be copy into a SQL database using SqlBulkCopy. The SQL database processes the newly data arrived from the staging table into the final table. I currently receive around 300'000 rows per day (can increae within the next weeks strongly) and my goal is to stay under 1 second from the time I receive the data until they are available in the final SQL table. Currently the maximum rows per seconds I have to process is around 50 rows.

Unfortunately, since receiving more and more data, my logic is getting slower in performance (still far under 1 second, but I wanna keep improving). The main bottleneck (so far) is the processing of the staging data (on the SQL database) into the final table. In order to improve the performance, I would like to switch the staging table into a memory-optimized table. The final table is already a memory-optimized table so they will work together fine for sure.

My questions:

  1. Is there a way to use SqlBulkCopy (out of C#) with memory-optimized tables? (as far as I know there is no way yet)
  2. Any suggestions for the fastest way to write the received data from my C# application into the memory-optimized staging table?

EDIT (with solution):

After the comments/answers and performance evaluations I decided to give up the bulk insert and use SQLCommand to handover a IEnumerable with my data as table-valued parameter into a native compiled stored procedure to store the data directly in my memory-optimized final table (as well as a copy into the "staging" table which now serves as archive). Performance increased significantly (even I did not consider parallelizing the inserts yet (will be at a later stage)).

Here is part of the code:

Memory-optimized user-defined table type (to handover the data from C# into SQL (stored procedure):

CREATE TYPE [Staging].[CityIndexIntradayLivePrices] AS TABLE(
    [CityIndexInstrumentID] [int] NOT NULL,
    [CityIndexTimeStamp] [bigint] NOT NULL,
    [BidPrice] [numeric](18, 8) NOT NULL,
    [AskPrice] [numeric](18, 8) NOT NULL,
    INDEX [IndexCityIndexIntradayLivePrices] NONCLUSTERED 
(
    [CityIndexInstrumentID] ASC,
    [CityIndexTimeStamp] ASC,
    [BidPrice] ASC,
    [AskPrice] ASC
)
)
WITH ( MEMORY_OPTIMIZED = ON )

Native compiled stored procedures to insert the data into final table and staging (which serves as archive in this case):

create procedure [Staging].[spProcessCityIndexIntradayLivePricesStaging]
(
    @ProcessingID int,
    @CityIndexIntradayLivePrices Staging.CityIndexIntradayLivePrices readonly
)
with native_compilation, schemabinding, execute as owner
as 
begin atomic
with (transaction isolation level=snapshot, language=N'us_english')


    -- store prices

    insert into TimeSeries.CityIndexIntradayLivePrices
    (
        ObjectID, 
        PerDateTime, 
        BidPrice, 
        AskPrice, 
        ProcessingID
    )
    select Objects.ObjectID,
    CityIndexTimeStamp,
    CityIndexIntradayLivePricesStaging.BidPrice, 
    CityIndexIntradayLivePricesStaging.AskPrice,
    @ProcessingID
    from @CityIndexIntradayLivePrices CityIndexIntradayLivePricesStaging,
    Objects.Objects
    where Objects.CityIndexInstrumentID = CityIndexIntradayLivePricesStaging.CityIndexInstrumentID


    -- store data in staging table

    insert into Staging.CityIndexIntradayLivePricesStaging
    (
        ImportProcessingID,
        CityIndexInstrumentID,
        CityIndexTimeStamp,
        BidPrice,
        AskPrice
    )
    select @ProcessingID,
    CityIndexInstrumentID,
    CityIndexTimeStamp,
    BidPrice,
    AskPrice
    from @CityIndexIntradayLivePrices


end

IEnumerable filled with the from the queue:

private static IEnumerable<SqlDataRecord> CreateSqlDataRecords()
{


    // set columns (the sequence is important as the sequence will be accordingly to the sequence of columns in the table-value parameter)

    SqlMetaData MetaDataCol1;
    SqlMetaData MetaDataCol2;
    SqlMetaData MetaDataCol3;
    SqlMetaData MetaDataCol4;

    MetaDataCol1 = new SqlMetaData("CityIndexInstrumentID", SqlDbType.Int);
    MetaDataCol2 = new SqlMetaData("CityIndexTimeStamp", SqlDbType.BigInt);
    MetaDataCol3 = new SqlMetaData("BidPrice", SqlDbType.Decimal, 18, 8); // precision 18, 8 scale
    MetaDataCol4 = new SqlMetaData("AskPrice", SqlDbType.Decimal, 18, 8); // precision 18, 8 scale


    // define sql data record with the columns

    SqlDataRecord DataRecord = new SqlDataRecord(new SqlMetaData[] { MetaDataCol1, MetaDataCol2, MetaDataCol3, MetaDataCol4 });


    // remove each price row from queue and add it to the sql data record

    LightstreamerAPI.PriceDTO PriceDTO = new LightstreamerAPI.PriceDTO();

    while (IntradayQuotesQueue.TryDequeue(out PriceDTO))
    {

        DataRecord.SetInt32(0, PriceDTO.MarketID); // city index market id
        DataRecord.SetInt64(1, Convert.ToInt64((PriceDTO.TickDate.Replace(@"\/Date(", "")).Replace(@")\/", ""))); // @ is used to avoid problem with / as escape sequence)
        DataRecord.SetDecimal(2, PriceDTO.Bid); // bid price
        DataRecord.SetDecimal(3, PriceDTO.Offer); // ask price

        yield return DataRecord;

    }


}

Handling the data every 0.5 seconds:

public static void ChildThreadIntradayQuotesHandler(Int32 CityIndexInterfaceProcessingID)
{


    try
    {

        // open new sql connection

        using (SqlConnection TimeSeriesDatabaseSQLConnection = new SqlConnection("Data Source=XXX;Initial Catalog=XXX;Integrated Security=SSPI;MultipleActiveResultSets=false"))
        {


            // open connection

            TimeSeriesDatabaseSQLConnection.Open();


            // endless loop to keep thread alive

            while(true)
            {


                // ensure queue has rows to process (otherwise no need to continue)

                if(IntradayQuotesQueue.Count > 0) 
                {


                    // define stored procedure for sql command

                    SqlCommand InsertCommand = new SqlCommand("Staging.spProcessCityIndexIntradayLivePricesStaging", TimeSeriesDatabaseSQLConnection);


                    // set command type to stored procedure

                    InsertCommand.CommandType = CommandType.StoredProcedure;


                    // define sql parameters (table-value parameter gets data from CreateSqlDataRecords())

                    SqlParameter ParameterCityIndexIntradayLivePrices = InsertCommand.Parameters.AddWithValue("@CityIndexIntradayLivePrices", CreateSqlDataRecords()); // table-valued parameter
                    SqlParameter ParameterProcessingID = InsertCommand.Parameters.AddWithValue("@ProcessingID", CityIndexInterfaceProcessingID); // processing id parameter


                    // set sql db type to structured for table-value paramter (structured = special data type for specifying structured data contained in table-valued parameters)

                    ParameterCityIndexIntradayLivePrices.SqlDbType = SqlDbType.Structured;


                    // execute stored procedure

                    InsertCommand.ExecuteNonQuery();


                }


                // wait 0.5 seconds

                Thread.Sleep(500);


            }

        }

    }
    catch (Exception e)
    {

        // handle error (standard error messages and update processing)

        ThreadErrorHandling(CityIndexInterfaceProcessingID, "ChildThreadIntradayQuotesHandler (handler stopped now)", e);

    };


}
like image 693
Reboon Avatar asked Apr 19 '16 17:04

Reboon


People also ask

What C is used for?

C programming language is a machine-independent programming language that is mainly used to create many types of applications and operating systems such as Windows, and other complicated programs such as the Oracle database, Git, Python interpreter, and games and is considered a programming foundation in the process of ...

Is C language easy?

C is a general-purpose language that most programmers learn before moving on to more complex languages. From Unix and Windows to Tic Tac Toe and Photoshop, several of the most commonly used applications today have been built on C. It is easy to learn because: A simple syntax with only 32 keywords.

What is the full name of C?

In the real sense it has no meaning or full form. It was developed by Dennis Ritchie and Ken Thompson at AT&T bell Lab. First, they used to call it as B language then later they made some improvement into it and renamed it as C and its superscript as C++ which was invented by Dr. Stroustroupe.

Is C programming hard?

C is more difficult to learn than JavaScript, but it's a valuable skill to have because most programming languages are actually implemented in C. This is because C is a “machine-level” language. So learning it will teach you how a computer works and will actually make learning new languages in the future easier.


1 Answers

Use SQL Server 2016 (it's not RTM yet, but it's already much better than 2014 when it comes to memory-optimized tables). Then use either a memory-optimized table variable or just blast a whole lot of native stored procedure calls in a transaction, each doing one insert, depending on what's faster in your scenario (this varies). A few things to watch out for:

  • Doing multiple inserts in one transaction is vital to save on network roundtrips. While in-memory operations are very fast, SQL Server still needs to confirm every operation.
  • Depending on how you're producing data, you may find that parallelizing the inserts can speed things up (don't overdo it; you'll quickly hit the saturation point). Don't try to be very clever yourself here; leverage async/await and/or Parallel.ForEach.
  • If you're passing a table-valued parameter, the easiest way of doing it is to pass a DataTable as the parameter value, but this is not the most efficient way of doing it -- that would be passing an IEnumerable<SqlDataRecord>. You can use an iterator method to generate the values, so only a constant amount of memory is allocated.

You'll have to experiment a bit to find the optimal way of passing through data; this depends a lot on the size of your data and how you're getting it.

like image 157
Jeroen Mostert Avatar answered Sep 24 '22 05:09

Jeroen Mostert