I have a use case where I need to:
Input looks something like this:
<Root>
<Input>
<Case>ABC123</Case>
<State>MA</State>
<Investor>Goldman</Investor>
</Input>
<Input>
<Case>BCD234</Case>
<State>CA</State>
<Investor>Goldman</Investor>
</Input>
</Root>
and the output:
<Results>
<Output>
<Case>ABC123</Case>
<State>MA</State>
<Investor>Goldman</Investor>
<Price>75.00</Price>
<Product>Blah</Product>
</Output>
<Output>
<Case>BCD234</Case>
<State>CA</State>
<Investor>Goldman</Investor>
<Price>55.00</Price>
<Product>Ack</Product>
</Output>
</Results>
I would like to run the calculations in parallel; the typical input file may have 50,000 input nodes, and the total processing time without threading may be 90 minutes. Approximately 90% of the processing time is spent on step #2 (the calculations).
I can iterate over the XmlReader in parallel easily enough:
static IEnumerable<XElement> EnumerateAxis(XmlReader reader, string axis)
{
reader.MoveToContent();
while (reader.Read())
{
switch (reader.NodeType)
{
case XmlNodeType.Element:
if (reader.Name == axis)
{
XElement el = XElement.ReadFrom(reader) as XElement;
if (el != null)
yield return el;
}
break;
}
}
}
...
Parallel.ForEach(EnumerateAxis(reader, "Input"), node =>
{
// do calc
// lock the XmlWriter, write, unlock
});
I'm currently inclined to use a lock when writing to the XmlWriter to ensure thread safety.
Is there a more elegant way to handle the XmlWriter in this case? Specifically, should I have the Parallel.ForEach code pass the results back to the originating thread and have that thread handle the XmlWriter, avoiding the need to lock? If so, I'm unsure of the correct approach for this.
This is my favourite kind of problem: one which can be solved with a pipeline.
Please note that depending on your circumstances this approach may actually negatively impact performance, but as you've explicitly asked how you can use the writer on a dedicated thread, the below code demonstrates exactly that.
Disclaimer: you should ideally consider TPL Dataflow for this, but it's not something I'm well-versed in so I'll just take the familiar Task
+ BlockingCollection<T>
route.
At first I was going to suggest a 3-stage pipeline (read, process, write), but then I realised that you've already combined the first two stages with the way you "stream" the nodes as they're being read and feeding them to your Parallel.ForEach
(yes, you've already implemented a pipeline of sorts). Even better - less thread synchronisation.
With that in mind, the code now becomes:
public class Result
{
public string Case { get; set; }
public string State { get; set; }
public string Investor { get; set; }
public decimal Price { get; set; }
public string Product { get; set; }
}
...
using (var reader = CreateXmlReader())
{
// I highly doubt that this collection will
// ever reach its bounded capacity since
// the processing stage takes so long,
// but in case it does, Parallel.ForEach
// will be throttled.
using (var handover = new BlockingCollection<Result>(boundedCapacity: 100))
{
var processStage = Task.Run(() =>
{
try
{
Parallel.ForEach(EnumerateAxis(reader, "Input"), node =>
{
// Do calc.
Thread.Sleep(1000);
// Hand over to the writer.
// This handover is not blocking (unless our
// blocking collection has reached its bounded
// capacity, which would indicate that the
// writer is running slower than expected).
handover.Add(new Result());
});
}
finally
{
handover.CompleteAdding();
}
});
var writeStage = Task.Run(() =>
{
using (var writer = CreateXmlReader())
{
foreach (var result in handover.GetConsumingEnumerable())
{
// Write element.
}
}
});
// Note: the two stages are now running in parallel.
// You could technically use Parallel.Invoke to
// achieve the same result with a bit less code.
Task.WaitAll(processStage, writeStage);
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With