I am trying to implement a data processing pipeline using TPL Dataflow
. However, I am relatively new to dataflow and not completely sure how to use it properly for the problem I am trying to solve.
Problem:
I am trying to iterate through the list of files and process each file to read some data and then further process that data. Each file is roughly 700MB
to 1GB
in size. Each file contains JSON
data. In order to process these files in parallel and not run of of memory, I am trying to use IEnumerable<>
with yield return
and then further process the data.
Once I get list of files, I want to process maximum 4-5 files at a time in parallel. My confusion comes from:
IEnumerable<>
and yeild return
with async/await
and dataflow. Came across this answer by svick, but still not sure how to convert IEnumerable<>
to ISourceBlock
and then link all blocks together and track completion.producer
will be really fast (going through list of files), but consumer
will be very slow (processing each file - read data, deserialize JSON
). In this case, how to track completion.LinkTo
feature of datablocks to connect various blocks? or use method such as OutputAvailableAsync()
and ReceiveAsync()
to propagate data from one block to another.Code:
private const int ProcessingSize= 4;
private BufferBlock<string> _fileBufferBlock;
private ActionBlock<string> _processingBlock;
private BufferBlock<DataType> _messageBufferBlock;
public Task ProduceAsync()
{
PrepareDataflow(token);
var bufferTask = ListFilesAsync(_fileBufferBlock, token);
var tasks = new List<Task> { bufferTask, _processingBlock.Completion };
return Task.WhenAll(tasks);
}
private async Task ListFilesAsync(ITargetBlock<string> targetBlock, CancellationToken token)
{
...
// Get list of file Uris
...
foreach(var fileNameUri in fileNameUris)
await targetBlock.SendAsync(fileNameUri, token);
targetBlock.Complete();
}
private async Task ProcessFileAsync(string fileNameUri, CancellationToken token)
{
var httpClient = new HttpClient();
try
{
using (var stream = await httpClient.GetStreamAsync(fileNameUri))
using (var sr = new StreamReader(stream))
using (var jsonTextReader = new JsonTextReader(sr))
{
while (jsonTextReader.Read())
{
if (jsonTextReader.TokenType == JsonToken.StartObject)
{
try
{
var data = _jsonSerializer.Deserialize<DataType>(jsonTextReader)
await _messageBufferBlock.SendAsync(data, token);
}
catch (Exception ex)
{
_logger.Error(ex, $"JSON deserialization failed - {fileNameUri}");
}
}
}
}
}
catch(Exception ex)
{
// Should throw?
// Or if converted to block then report using Fault() method?
}
finally
{
httpClient.Dispose();
buffer.Complete();
}
}
private void PrepareDataflow(CancellationToken token)
{
_fileBufferBlock = new BufferBlock<string>(new DataflowBlockOptions
{
CancellationToken = token
});
var actionExecuteOptions = new ExecutionDataflowBlockOptions
{
CancellationToken = token,
BoundedCapacity = ProcessingSize,
MaxMessagesPerTask = 1,
MaxDegreeOfParallelism = ProcessingSize
};
_processingBlock = new ActionBlock<string>(async fileName =>
{
try
{
await ProcessFileAsync(fileName, token);
}
catch (Exception ex)
{
_logger.Fatal(ex, $"Failed to process fiel: {fileName}, Error: {ex.Message}");
// Should fault the block?
}
}, actionExecuteOptions);
_fileBufferBlock.LinkTo(_processingBlock, new DataflowLinkOptions { PropagateCompletion = true });
_messageBufferBlock = new BufferBlock<DataType>(new ExecutionDataflowBlockOptions
{
CancellationToken = token,
BoundedCapacity = 50000
});
_messageBufferBlock.LinkTo(DataflowBlock.NullTarget<DataType>());
}
In the above code, I am not using IEnumerable<DataType>
and yield return
as I cannot use it with async/await
. So I am linking input buffer to ActionBlock<DataType>
which in turn posts to another queue. However by using ActionBlock<>
, I cannot link it to next block for processing and have to manually Post/SendAsync
from ActionBlock<>
to BufferBlock<>
. Also, in this case, not sure, how to track completion.
This code works, but, I am sure there could be better solution then this and I can just link all the block (instead of ActionBlock<DataType>
and then sending messages from it to BufferBlock<DataType>
)
Another option could be to convert IEnumerable<>
to IObservable<>
using Rx
, but again I am not much familiar with Rx
and don't know exactly how to mix TPL Dataflow
and Rx
Await & Async was built on the Task Parallel Library (TPL) which was introduced in the . NET Framework 4. Their purpose is to enable asynchronous programming.
Using an async yield return statement requires that the method be asynchronous, making use of async/await. Usually an async method will return a task. Your first thought when using yield return in your async method may be to have the method return Task of IEnumerable.
The TPL Dataflow Library consists of dataflow blocks, which are data structures that buffer and process data. The TPL defines three kinds of dataflow blocks: source blocks, target blocks, and propagator blocks. A source block acts as a source of data and can be read from.
Event handlers naturally return void, so async methods return void so that you can have an asynchronous event handler.
Question 1
You plug an IEnumerable<T>
producer into your TPL Dataflow chain by using Post
or SendAsync
directly on the consumer block, as follows:
foreach (string fileNameUri in fileNameUris)
{
await _processingBlock.SendAsync(fileNameUri).ConfigureAwait(false);
}
You can also use a BufferBlock<TInput>
, but in your case it actually seems rather unnecessary (or even harmful - see the next part).
Question 2
When would you prefer SendAsync
instead of Post
? If your producer runs faster than the URIs can be processed (and you have indicated this to be the case), and you choose to give your _processingBlock
a BoundedCapacity
, then when the block's internal buffer reaches the specified capacity, your SendAsync
will "hang" until a buffer slot frees up, and your foreach
loop will be throttled. This feedback mechanism creates back pressure and ensures that you don't run out of memory.
Question 3
You should definitely use the LinkTo
method to link your blocks in most cases. Unfortunately yours is a corner case due to the interplay of IDisposable
and very large (potentially) sequences. So your completion will flow automatically between the buffer and processing blocks (due to LinkTo
), but after that - you need to propagate it manually. This is tricky, but doable.
I'll illustrate this with a "Hello World" example where the producer iterates over each character and the consumer (which is really slow) outputs each character to the Debug window.
Note: LinkTo
is not present.
// REALLY slow consumer.
var consumer = new ActionBlock<char>(async c =>
{
await Task.Delay(100);
Debug.Print(c.ToString());
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
var producer = new ActionBlock<string>(async s =>
{
foreach (char c in s)
{
await consumer.SendAsync(c);
Debug.Print($"Yielded {c}");
}
});
try
{
producer.Post("Hello world");
producer.Complete();
await producer.Completion;
}
finally
{
consumer.Complete();
}
// Observe combined producer and consumer completion/exceptions/cancellation.
await Task.WhenAll(producer.Completion, consumer.Completion);
This outputs:
Yielded H H Yielded e e Yielded l l Yielded l l Yielded o o Yielded Yielded w w Yielded o o Yielded r r Yielded l l Yielded d d
As you can see from the output above, the producer is throttled and the handover buffer between the blocks never grows too large.
EDIT
You might find it cleaner to propagate completion via
producer.Completion.ContinueWith(
_ => consumer.Complete(), TaskContinuationOptions.ExecuteSynchronously
);
... right after producer
definition. This allows you to slightly reduce producer/consumer coupling - but at the end you still have to remember to observe Task.WhenAll(producer.Completion, consumer.Completion)
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With