I am receiving (streamed) data from an external source (over Lightstreamer) into my C# application. My C# application receives data from the listener. The data from the listener are stored in a queue (ConcurrentQueue). The queue is getting cleaned every 0.5 seconds with TryDequeue into a DataTable. The DataTable will then be copy into a SQL database using SqlBulkCopy. The SQL database processes the newly data arrived from the staging table into the final table. I currently receive around 300'000 rows per day (can increae within the next weeks strongly) and my goal is to stay under 1 second from the time I receive the data until they are available in the final SQL table. Currently the maximum rows per seconds I have to process is around 50 rows.
Unfortunately, since receiving more and more data, my logic is getting slower in performance (still far under 1 second, but I wanna keep improving). The main bottleneck (so far) is the processing of the staging data (on the SQL database) into the final table. In order to improve the performance, I would like to switch the staging table into a memory-optimized table. The final table is already a memory-optimized table so they will work together fine for sure.
My questions:
EDIT (with solution):
After the comments/answers and performance evaluations I decided to give up the bulk insert and use SQLCommand to handover a IEnumerable with my data as table-valued parameter into a native compiled stored procedure to store the data directly in my memory-optimized final table (as well as a copy into the "staging" table which now serves as archive). Performance increased significantly (even I did not consider parallelizing the inserts yet (will be at a later stage)).
Here is part of the code:
Memory-optimized user-defined table type (to handover the data from C# into SQL (stored procedure):
CREATE TYPE [Staging].[CityIndexIntradayLivePrices] AS TABLE(
[CityIndexInstrumentID] [int] NOT NULL,
[CityIndexTimeStamp] [bigint] NOT NULL,
[BidPrice] [numeric](18, 8) NOT NULL,
[AskPrice] [numeric](18, 8) NOT NULL,
INDEX [IndexCityIndexIntradayLivePrices] NONCLUSTERED
(
[CityIndexInstrumentID] ASC,
[CityIndexTimeStamp] ASC,
[BidPrice] ASC,
[AskPrice] ASC
)
)
WITH ( MEMORY_OPTIMIZED = ON )
Native compiled stored procedures to insert the data into final table and staging (which serves as archive in this case):
create procedure [Staging].[spProcessCityIndexIntradayLivePricesStaging]
(
@ProcessingID int,
@CityIndexIntradayLivePrices Staging.CityIndexIntradayLivePrices readonly
)
with native_compilation, schemabinding, execute as owner
as
begin atomic
with (transaction isolation level=snapshot, language=N'us_english')
-- store prices
insert into TimeSeries.CityIndexIntradayLivePrices
(
ObjectID,
PerDateTime,
BidPrice,
AskPrice,
ProcessingID
)
select Objects.ObjectID,
CityIndexTimeStamp,
CityIndexIntradayLivePricesStaging.BidPrice,
CityIndexIntradayLivePricesStaging.AskPrice,
@ProcessingID
from @CityIndexIntradayLivePrices CityIndexIntradayLivePricesStaging,
Objects.Objects
where Objects.CityIndexInstrumentID = CityIndexIntradayLivePricesStaging.CityIndexInstrumentID
-- store data in staging table
insert into Staging.CityIndexIntradayLivePricesStaging
(
ImportProcessingID,
CityIndexInstrumentID,
CityIndexTimeStamp,
BidPrice,
AskPrice
)
select @ProcessingID,
CityIndexInstrumentID,
CityIndexTimeStamp,
BidPrice,
AskPrice
from @CityIndexIntradayLivePrices
end
IEnumerable filled with the from the queue:
private static IEnumerable<SqlDataRecord> CreateSqlDataRecords()
{
// set columns (the sequence is important as the sequence will be accordingly to the sequence of columns in the table-value parameter)
SqlMetaData MetaDataCol1;
SqlMetaData MetaDataCol2;
SqlMetaData MetaDataCol3;
SqlMetaData MetaDataCol4;
MetaDataCol1 = new SqlMetaData("CityIndexInstrumentID", SqlDbType.Int);
MetaDataCol2 = new SqlMetaData("CityIndexTimeStamp", SqlDbType.BigInt);
MetaDataCol3 = new SqlMetaData("BidPrice", SqlDbType.Decimal, 18, 8); // precision 18, 8 scale
MetaDataCol4 = new SqlMetaData("AskPrice", SqlDbType.Decimal, 18, 8); // precision 18, 8 scale
// define sql data record with the columns
SqlDataRecord DataRecord = new SqlDataRecord(new SqlMetaData[] { MetaDataCol1, MetaDataCol2, MetaDataCol3, MetaDataCol4 });
// remove each price row from queue and add it to the sql data record
LightstreamerAPI.PriceDTO PriceDTO = new LightstreamerAPI.PriceDTO();
while (IntradayQuotesQueue.TryDequeue(out PriceDTO))
{
DataRecord.SetInt32(0, PriceDTO.MarketID); // city index market id
DataRecord.SetInt64(1, Convert.ToInt64((PriceDTO.TickDate.Replace(@"\/Date(", "")).Replace(@")\/", ""))); // @ is used to avoid problem with / as escape sequence)
DataRecord.SetDecimal(2, PriceDTO.Bid); // bid price
DataRecord.SetDecimal(3, PriceDTO.Offer); // ask price
yield return DataRecord;
}
}
Handling the data every 0.5 seconds:
public static void ChildThreadIntradayQuotesHandler(Int32 CityIndexInterfaceProcessingID)
{
try
{
// open new sql connection
using (SqlConnection TimeSeriesDatabaseSQLConnection = new SqlConnection("Data Source=XXX;Initial Catalog=XXX;Integrated Security=SSPI;MultipleActiveResultSets=false"))
{
// open connection
TimeSeriesDatabaseSQLConnection.Open();
// endless loop to keep thread alive
while(true)
{
// ensure queue has rows to process (otherwise no need to continue)
if(IntradayQuotesQueue.Count > 0)
{
// define stored procedure for sql command
SqlCommand InsertCommand = new SqlCommand("Staging.spProcessCityIndexIntradayLivePricesStaging", TimeSeriesDatabaseSQLConnection);
// set command type to stored procedure
InsertCommand.CommandType = CommandType.StoredProcedure;
// define sql parameters (table-value parameter gets data from CreateSqlDataRecords())
SqlParameter ParameterCityIndexIntradayLivePrices = InsertCommand.Parameters.AddWithValue("@CityIndexIntradayLivePrices", CreateSqlDataRecords()); // table-valued parameter
SqlParameter ParameterProcessingID = InsertCommand.Parameters.AddWithValue("@ProcessingID", CityIndexInterfaceProcessingID); // processing id parameter
// set sql db type to structured for table-value paramter (structured = special data type for specifying structured data contained in table-valued parameters)
ParameterCityIndexIntradayLivePrices.SqlDbType = SqlDbType.Structured;
// execute stored procedure
InsertCommand.ExecuteNonQuery();
}
// wait 0.5 seconds
Thread.Sleep(500);
}
}
}
catch (Exception e)
{
// handle error (standard error messages and update processing)
ThreadErrorHandling(CityIndexInterfaceProcessingID, "ChildThreadIntradayQuotesHandler (handler stopped now)", e);
};
}
C programming language is a machine-independent programming language that is mainly used to create many types of applications and operating systems such as Windows, and other complicated programs such as the Oracle database, Git, Python interpreter, and games and is considered a programming foundation in the process of ...
C is a general-purpose language that most programmers learn before moving on to more complex languages. From Unix and Windows to Tic Tac Toe and Photoshop, several of the most commonly used applications today have been built on C. It is easy to learn because: A simple syntax with only 32 keywords.
In the real sense it has no meaning or full form. It was developed by Dennis Ritchie and Ken Thompson at AT&T bell Lab. First, they used to call it as B language then later they made some improvement into it and renamed it as C and its superscript as C++ which was invented by Dr. Stroustroupe.
C is more difficult to learn than JavaScript, but it's a valuable skill to have because most programming languages are actually implemented in C. This is because C is a “machine-level” language. So learning it will teach you how a computer works and will actually make learning new languages in the future easier.
Use SQL Server 2016 (it's not RTM yet, but it's already much better than 2014 when it comes to memory-optimized tables). Then use either a memory-optimized table variable or just blast a whole lot of native stored procedure calls in a transaction, each doing one insert, depending on what's faster in your scenario (this varies). A few things to watch out for:
async
/await
and/or Parallel.ForEach
.DataTable
as the parameter value, but this is not the most efficient way of doing it -- that would be passing an IEnumerable<SqlDataRecord>
. You can use an iterator method to generate the values, so only a constant amount of memory is allocated.You'll have to experiment a bit to find the optimal way of passing through data; this depends a lot on the size of your data and how you're getting it.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With