Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using async/await and yield return with TPL Dataflow

I am trying to implement a data processing pipeline using TPL Dataflow. However, I am relatively new to dataflow and not completely sure how to use it properly for the problem I am trying to solve.

Problem:

I am trying to iterate through the list of files and process each file to read some data and then further process that data. Each file is roughly 700MB to 1GB in size. Each file contains JSON data. In order to process these files in parallel and not run of of memory, I am trying to use IEnumerable<> with yield return and then further process the data.

Once I get list of files, I want to process maximum 4-5 files at a time in parallel. My confusion comes from:

  • How to use IEnumerable<> and yeild return with async/await and dataflow. Came across this answer by svick, but still not sure how to convert IEnumerable<> to ISourceBlock and then link all blocks together and track completion.
  • In my case, producer will be really fast (going through list of files), but consumer will be very slow (processing each file - read data, deserialize JSON). In this case, how to track completion.
  • Should I use LinkTo feature of datablocks to connect various blocks? or use method such as OutputAvailableAsync() and ReceiveAsync() to propagate data from one block to another.

Code:

private const int ProcessingSize= 4;
private BufferBlock<string> _fileBufferBlock;
private ActionBlock<string> _processingBlock;
private BufferBlock<DataType> _messageBufferBlock;

public Task ProduceAsync()
{
    PrepareDataflow(token);
    var bufferTask = ListFilesAsync(_fileBufferBlock, token);

    var tasks = new List<Task> { bufferTask, _processingBlock.Completion };
    return Task.WhenAll(tasks);
}

private async Task ListFilesAsync(ITargetBlock<string> targetBlock, CancellationToken token)
{
    ...
    // Get list of file Uris
    ...
    foreach(var fileNameUri in fileNameUris)
        await targetBlock.SendAsync(fileNameUri, token);

    targetBlock.Complete();
}

private async Task ProcessFileAsync(string fileNameUri, CancellationToken token)
{
    var httpClient = new HttpClient();
    try
    {
        using (var stream = await httpClient.GetStreamAsync(fileNameUri))
        using (var sr = new StreamReader(stream))
        using (var jsonTextReader = new JsonTextReader(sr))
        {
            while (jsonTextReader.Read())
            {
                if (jsonTextReader.TokenType == JsonToken.StartObject)
                {
                    try
                    {
                        var data = _jsonSerializer.Deserialize<DataType>(jsonTextReader)
                        await _messageBufferBlock.SendAsync(data, token);
                    }
                    catch (Exception ex)
                    {
                        _logger.Error(ex, $"JSON deserialization failed - {fileNameUri}");
                    }
                }
            }
        }
    }
    catch(Exception ex)
    {
        // Should throw?
        // Or if converted to block then report using Fault() method?
    }
    finally
    {
        httpClient.Dispose();
        buffer.Complete();
    }
}

private void PrepareDataflow(CancellationToken token)
{
    _fileBufferBlock = new BufferBlock<string>(new DataflowBlockOptions
    {
        CancellationToken = token
    });

    var actionExecuteOptions = new ExecutionDataflowBlockOptions
    {
        CancellationToken = token,
        BoundedCapacity = ProcessingSize,
        MaxMessagesPerTask = 1,
        MaxDegreeOfParallelism = ProcessingSize
    };
    _processingBlock = new ActionBlock<string>(async fileName =>
    {
        try
        {
            await ProcessFileAsync(fileName, token);
        }
        catch (Exception ex)
        {
            _logger.Fatal(ex, $"Failed to process fiel: {fileName}, Error: {ex.Message}");
            // Should fault the block?
        }
    }, actionExecuteOptions);

    _fileBufferBlock.LinkTo(_processingBlock, new DataflowLinkOptions { PropagateCompletion = true });

    _messageBufferBlock = new BufferBlock<DataType>(new ExecutionDataflowBlockOptions
    {
        CancellationToken = token,
        BoundedCapacity = 50000
    });
    _messageBufferBlock.LinkTo(DataflowBlock.NullTarget<DataType>());
}

In the above code, I am not using IEnumerable<DataType> and yield return as I cannot use it with async/await. So I am linking input buffer to ActionBlock<DataType> which in turn posts to another queue. However by using ActionBlock<>, I cannot link it to next block for processing and have to manually Post/SendAsync from ActionBlock<> to BufferBlock<>. Also, in this case, not sure, how to track completion.

This code works, but, I am sure there could be better solution then this and I can just link all the block (instead of ActionBlock<DataType> and then sending messages from it to BufferBlock<DataType>)

Another option could be to convert IEnumerable<> to IObservable<> using Rx, but again I am not much familiar with Rx and don't know exactly how to mix TPL Dataflow and Rx

like image 517
Tejas Vora Avatar asked Feb 12 '16 20:02

Tejas Vora


People also ask

Is async await part of TPL?

Await & Async was built on the Task Parallel Library (TPL) which was introduced in the . NET Framework 4. Their purpose is to enable asynchronous programming.

Is yield return async?

Using an async yield return statement requires that the method be asynchronous, making use of async/await. Usually an async method will return a task. Your first thought when using yield return in your async method may be to have the method return Task of IEnumerable.

What is TPL Dataflow?

The TPL Dataflow Library consists of dataflow blocks, which are data structures that buffer and process data. The TPL defines three kinds of dataflow blocks: source blocks, target blocks, and propagator blocks. A source block acts as a source of data and can be read from.

Can async function return void?

Event handlers naturally return void, so async methods return void so that you can have an asynchronous event handler.


1 Answers

Question 1

You plug an IEnumerable<T> producer into your TPL Dataflow chain by using Post or SendAsync directly on the consumer block, as follows:

foreach (string fileNameUri in fileNameUris)
{
    await _processingBlock.SendAsync(fileNameUri).ConfigureAwait(false);
}

You can also use a BufferBlock<TInput>, but in your case it actually seems rather unnecessary (or even harmful - see the next part).

Question 2

When would you prefer SendAsync instead of Post? If your producer runs faster than the URIs can be processed (and you have indicated this to be the case), and you choose to give your _processingBlock a BoundedCapacity, then when the block's internal buffer reaches the specified capacity, your SendAsync will "hang" until a buffer slot frees up, and your foreach loop will be throttled. This feedback mechanism creates back pressure and ensures that you don't run out of memory.

Question 3

You should definitely use the LinkTo method to link your blocks in most cases. Unfortunately yours is a corner case due to the interplay of IDisposable and very large (potentially) sequences. So your completion will flow automatically between the buffer and processing blocks (due to LinkTo), but after that - you need to propagate it manually. This is tricky, but doable.

I'll illustrate this with a "Hello World" example where the producer iterates over each character and the consumer (which is really slow) outputs each character to the Debug window.

Note: LinkTo is not present.

// REALLY slow consumer.
var consumer = new ActionBlock<char>(async c =>
{
    await Task.Delay(100);

    Debug.Print(c.ToString());
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

var producer = new ActionBlock<string>(async s =>
{
    foreach (char c in s)
    {
        await consumer.SendAsync(c);

        Debug.Print($"Yielded {c}");
    }
});

try
{
    producer.Post("Hello world");
    producer.Complete();

    await producer.Completion;
}
finally
{
    consumer.Complete();
}

// Observe combined producer and consumer completion/exceptions/cancellation.
await Task.WhenAll(producer.Completion, consumer.Completion);

This outputs:

Yielded H
H
Yielded e
e
Yielded l
l
Yielded l
l
Yielded o
o
Yielded  

Yielded w
w
Yielded o
o
Yielded r
r
Yielded l
l
Yielded d
d

As you can see from the output above, the producer is throttled and the handover buffer between the blocks never grows too large.

EDIT

You might find it cleaner to propagate completion via

producer.Completion.ContinueWith(
    _ => consumer.Complete(), TaskContinuationOptions.ExecuteSynchronously
);

... right after producer definition. This allows you to slightly reduce producer/consumer coupling - but at the end you still have to remember to observe Task.WhenAll(producer.Completion, consumer.Completion).

like image 57
Kirill Shlenskiy Avatar answered Oct 13 '22 20:10

Kirill Shlenskiy