I'm building a console application that have to process a bunch of data.
Basically, the application grabs references from a DB. For each reference, parse the content of the file and make some changes. The files are HTML files, and the process is doing a heavy work with RegEx replacements (find references and transform them into links). The results in then stored on the file system and sent to an external system.
If I resume the process, in a sequential way :
var refs = GetReferencesFromDB(); // ~5000 Datarow returned
foreach(var ref in refs)
{
var filePath = GetFilePath(ref); // This method looks up in a previously loaded file list
var html = File.ReadAllText(filePath); // Read html locally, or from a network drive
var convertedHtml = ParseHtml(html);
File.WriteAllText(destinationFilePath); // Copy the result locally, or a network drive
SendToWs(ref, convertedHtml);
}
My program is working correctly but is quite slow. That's why I want to parallelise the process.
By now, I made a simple Parallelization adding AsParallel :
var refs = GetReferencesFromDB().AsParallel();
refs.ForAll(ref=>
{
var filePath = GetFilePath(ref);
var html = File.ReadAllText(filePath);
var convertedHtml = ParseHtml(html);
File.WriteAllText(destinationFilePath);
SendToWs(ref, convertedHtml);
});
This simple change decrease the duration of the process (25% less time). However, what I understand with parallelization is that there won't be much benefits (or worse, less benefits) if parallelyzing over resources relying on I/O, because the i/o won't magically doubles.
That's why I think I should change my approach not to parallelize the whole process, but to create dependent chained queued tasks.
I.E., I should create a flow like :
Queue read file. When finished, Queue ParseHtml. When finished, Queue both send to WS and write locally. When finished, log the result.
However, I don't know how to realize such think.
I feel it will ends in a set of consumer/producer queues, but I didn't find a correct sample.
And moreover, I'm not sure if there will be benefits.
thanks for advices
[Edit] In fact, I'm the perfect candidate for using c# 4.5... if only it was rtm :)
[Edit 2] Another thing making me thinking it's not correctly parallelized, is that in the resource monitor, I see graphs of CPU, network I/O and disk I/O not stable. when one is high, others are low to medium
You're not leveraging any async I/O APIs in any of your code. Everything you're doing is CPU bound and all your I/O operations are going to waste CPU resources blocking. AsParallel
is for compute bound tasks, if you want to take advantage of async I/O you need to leverage the Asynchronous Programming Model (APM) based APIs today in <= v4.0. This is done by looking for BeginXXX/EndXXX
methods on the I/O based classes you're using and leveraging those whenever available.
Read this post for starters: TPL TaskFactory.FromAsync vs Tasks with blocking methods
Next, you don't want to use AsParallel
in this case anyway. AsParallel
enables streaming which will result in an immediately scheduling a new Task per item, but you don't need/want that here. You'd be much better served by partitioning the work using Parallel::ForEach
.
Let's see how you can use this knowledge to achieve max concurrency in your specific case:
var refs = GetReferencesFromDB();
// Using Parallel::ForEach here will partition and process your data on separate worker threads
Parallel.ForEach(
refs,
ref =>
{
string filePath = GetFilePath(ref);
byte[] fileDataBuffer = new byte[1048576];
// Need to use FileStream API directly so we can enable async I/O
FileStream sourceFileStream = new FileStream(
filePath,
FileMode.Open,
FileAccess.Read,
FileShare.Read,
8192,
true);
// Use FromAsync to read the data from the file
Task<int> readSourceFileStreamTask = Task.Factory.FromAsync(
sourceFileStream.BeginRead
sourceFileStream.EndRead
fileDataBuffer,
fileDataBuffer.Length,
null);
// Add a continuation that will fire when the async read is completed
readSourceFileStreamTask.ContinueWith(readSourceFileStreamAntecedent =>
{
int soureFileStreamBytesRead;
try
{
// Determine exactly how many bytes were read
// NOTE: this will propagate any potential exception that may have occurred in EndRead
sourceFileStreamBytesRead = readSourceFileStreamAntecedent.Result;
}
finally
{
// Always clean up the source stream
sourceFileStream.Close();
sourceFileStream = null;
}
// This is here to make sure you don't end up trying to read files larger than this sample code can handle
if(sourceFileStreamBytesRead == fileDataBuffer.Length)
{
throw new NotSupportedException("You need to implement reading files larger than 1MB. :P");
}
// Convert the file data to a string
string html = Encoding.UTF8.GetString(fileDataBuffer, 0, sourceFileStreamBytesRead);
// Parse the HTML
string convertedHtml = ParseHtml(html);
// This is here to make sure you don't end up trying to write files larger than this sample code can handle
if(Encoding.UTF8.GetByteCount > fileDataBuffer.Length)
{
throw new NotSupportedException("You need to implement writing files larger than 1MB. :P");
}
// Convert the file data back to bytes for writing
Encoding.UTF8.GetBytes(convertedHtml, 0, convertedHtml.Length, fileDataBuffer, 0);
// Need to use FileStream API directly so we can enable async I/O
FileStream destinationFileStream = new FileStream(
destinationFilePath,
FileMode.OpenOrCreate,
FileAccess.Write,
FileShare.None,
8192,
true);
// Use FromAsync to read the data from the file
Task destinationFileStreamWriteTask = Task.Factory.FromAsync(
destinationFileStream.BeginWrite,
destinationFileStream.EndWrite,
fileDataBuffer,
0,
fileDataBuffer.Length,
null);
// Add a continuation that will fire when the async write is completed
destinationFileStreamWriteTask.ContinueWith(destinationFileStreamWriteAntecedent =>
{
try
{
// NOTE: we call wait here to observe any potential exceptions that might have occurred in EndWrite
destinationFileStreamWriteAntecedent.Wait();
}
finally
{
// Always close the destination file stream
destinationFileStream.Close();
destinationFileStream = null;
}
},
TaskContinuationOptions.AttachedToParent);
// Send to external system **concurrent** to writing to destination file system above
SendToWs(ref, convertedHtml);
},
TaskContinuationOptions.AttachedToParent);
});
Now, here's few notes:
TaskContinuationOptions.AttachedToParent
. This is very important as it will prevent the worker thread that the Parallel::ForEach
starts the work with from completing until all the underlying async calls have completed. If this was not here you would kick off work for all 5000 items concurrently which would pollute the TPL subsystem with thousands of scheduled Tasks and not scale properly at all.The good news is your logic could be easily separated into steps that go into a producer-consumer pipeline.
If you are using .NET 4.0 you can use the BlockingCollection
data structure as the backbone for the each step's producer-consumer queue. The main thread will enqueue each work item into step 1's queue where it will be picked up and processed and then forwarded on to step 2's queue and so on and so forth.
If you are willing to move on to the Async CTP then you can take advantage of the new TPL Dataflow structures for this as well. There is the BufferBlock<T>
data structure, among others, that behaves in a similar manner to BlockingCollection
and integrates well with the new async
and await
keywords.
Because your algorithm is IO bound the producer-consumer strategies may not get you the performance boost you are looking for, but at least you will have a very elegant solution that would scale well if you could increase the IO throughput. I am afraid steps 1 and 3 will be the bottlenecks and the pipeline will not balance well, but it is worth experimenting with.
Just a suggestion, but have you looked into the Consumer / Producer pattern ? A certain number of threads would read your files on disk and feed the content to a queue. Then another set of threads, known as the consumers, would "consume" the queue as its filled. http://zone.ni.com/devzone/cda/tut/p/id/3023
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With