I wrote the following method to batch process a huge CSV file. The idea is to read a chunk of lines from the file into memory, then partition these chunk of lines into batches of fixed size. Once we get the partitions, send these partitions to a server (sync or async) which might take a while.
private static void BatchProcess(string filePath, int chunkSize, int batchSize)
{
List<string> chunk = new List<string>(chunkSize);
foreach (var line in File.ReadLines(filePath))
{
if (chunk.Count == chunk.Capacity)
{
// Partition each chunk into smaller chunks grouped on column 1
var partitions = chunk.GroupBy(c => c.Split(',')[0], (key, g) => g);
// Further breakdown the chunks into batch size groups
var groups = partitions.Select(x => x.Select((i, index) =>
new { i, index }).GroupBy(g => g.index / batchSize, e => e.i));
// Get batches from groups
var batches = groups.SelectMany(x => x)
.Select(y => y.Select(z => z)).ToList();
// Process all batches asynchronously
batches.AsParallel().ForAll(async b =>
{
WebClient client = new WebClient();
byte[] bytes = System.Text.Encoding.ASCII
.GetBytes(b.SelectMany(x => x).ToString());
await client.UploadDataTaskAsync("myserver.com", bytes);
});
// clear the chunk
chunk.Clear();
}
chunk.Add(line);
}
}
This piece of code doesn't seem to be very efficient because of 2 reasons.
The main thread that reads from the CSV file is blocked, till all the partitions are processed.
The AsParallel blocks till all the tasks are finished. So if there are more threads available in the thread pool to do work, I'm not using them coz the no of tasks are bound by no of partitions.
The batchSize is fixed so can't be changed but chunkSize is tuneable for performance. I can choose a large enough chunkSize, such that no of batches created are >> no of threads available in the system, but it still means that the Parallel.ForEach method blocks till all the tasks are finished.
How can I change the code such that all available threads in the system are utilized to do the work w/o sitting idle. I'm thinking I could use a BlockingCollection to store the batches but not sure what capacity size to give it since no of batches are dynamic in each chunk.
Any ideas on how to use TPL to maximize thread utilization so that most available threads on the system are always doing stuff?
UPDATE: This is what I got so far using TPL dataflow. Is this correct?
private static void UploadData(string filePath, int chunkSize, int batchSize)
{
var buffer1 = new BatchBlock<string>(chunkSize);
var buffer2 = new BufferBlock<IEnumerable<string>>();
var action1 = new ActionBlock<string[]>(t =>
{
Console.WriteLine("Got a chunk of lines " + t.Count());
// Partition each chunk into smaller chunks grouped on column 1
var partitions = t.GroupBy(c => c.Split(',')[0], (key, g) => g);
// Further breakdown the chunks into batch size groups
var groups = partitions.Select(x => x.Select((i, index) =>
new { i, index }).GroupBy(g => g.index / batchSize, e => e.i));
// Get batches from groups
var batches = groups.SelectMany(x => x).Select(y => y.Select(z => z));
foreach (var batch in batches)
{
buffer2.Post(batch);
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
buffer1.LinkTo(action1, new DataflowLinkOptions
{ PropagateCompletion = true });
var action2 = new TransformBlock<IEnumerable<string>,
IEnumerable<string>>(async b =>
{
await ExecuteBatch(b);
return b;
}, new ExecutionDataflowBlockOptions
{ MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
buffer2.LinkTo(action2, new DataflowLinkOptions
{ PropagateCompletion = true });
var action3 = new ActionBlock<IEnumerable<string>>(b =>
{
Console.WriteLine("Finised executing a batch");
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
action2.LinkTo(action3, new DataflowLinkOptions
{ PropagateCompletion = true });
Task produceTask = Task.Factory.StartNew(() =>
{
foreach (var line in File.ReadLines(filePath))
{
buffer1.Post(line);
}
//Once marked complete your entire data flow will signal a stop for
// all new items
Console.WriteLine("Finished producing");
buffer1.Complete();
});
Task.WaitAll(produceTask);
Console.WriteLine("Produced complete");
action1.Completion.Wait();//Will add all the items to buffer2
Console.WriteLine("Action1 complete");
buffer2.Complete();//will not get any new items
action2.Completion.Wait();//Process the batch of 5 and then complete
Task.Wait(action3.Completion);
Console.WriteLine("Process complete");
Console.ReadLine();
}
You were close, in TPL data flows from one block to the other, and you should try to keep to that paradigm. So for example action1 should be a TransformManyBlock because an ActionBlock
is an ITargetBlock
(i.e. a termination block).
When you specify propagate completion on a link, the complete event is automatically routed through the block, so you only need to do one wait() on the last block.
Think of is as a domino chain, you call complete on the first block and it will propagate through the chain to the last block.
You should also consider what and why you are multi-threading; your example is heavily I/O bound and I don't think tying a bunch of thread to wait for I/O completion is the right solution.
Finally, do mind what is blocking or not. In your example buffer1.Post(...)
is not a blocking call, you have no reason to have that in a task.
I have written the following sample code that uses TPL DataFlow:
static void Main(string[] args)
{
var filePath = "C:\\test.csv";
var chunkSize = 1024;
var batchSize = 128;
var linkCompletion = new DataflowLinkOptions
{
PropagateCompletion = true
};
var uploadData = new ActionBlock<IEnumerable<string>>(
async (data) =>
{
WebClient client = new WebClient();
var payload = data.SelectMany(x => x).ToArray();
byte[] bytes = System.Text.Encoding.ASCII.GetBytes(payload);
//await client.UploadDataTaskAsync("myserver.com", bytes);
await Task.Delay(2000);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded /* Prefer to limit that to some reasonable value */ });
var lineBuffer = new BatchBlock<string>(chunkSize);
var splitData = new TransformManyBlock<IEnumerable<string>, IEnumerable<string>>(
(data) =>
{
// Partition each chunk into smaller chunks grouped on column 1
var partitions = data.GroupBy(c => c.Split(',')[0]);
// Further beakdown the chunks into batch size groups
var groups = partitions.Select(x => x.Select((i, index) => new { i, index }).GroupBy(g => g.index / batchSize, e => e.i));
// Get batches from groups
var batches = groups.SelectMany(x => x).Select(y => y.Select(z => z));
// Don't forget to enumerate before returning
return batches.ToList();
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
lineBuffer.LinkTo(splitData, linkCompletion);
splitData.LinkTo(uploadData, linkCompletion);
foreach (var line in File.ReadLines(filePath))
{
lineBuffer.Post(line);
}
lineBuffer.Complete();
// Wait for uploads to finish
uploadData.Completion.Wait();
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With