Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is this a job for TPL Dataflow?

I run a pretty typical producer/consumer model on different tasks.

Task1: Reads batches of byte[] from binary files and kicks off a new task for each collection of byte arrays. (the operation is batched for memory management purposes).

Task 2-n: Those are worker tasks and each operates on the passed-in collection (from Tasks1) of byte arrays and de-serializes byte arrays, sorts them by certain criteria, and then stores a collection of resulting objects (each byte array deserializes into such object) in a Concurrent Dictionary.

Task (n+1) I chose a concurrent dictionary because the job of this task is to merge those collections that are stored in the concurrent dictionary in the same order than how they originated from Task1. I achieve that by passing a collectionID (it is of type int and incremented for each new collection within Task1) all the way down from Task1 to this task. This task basically checks whether the next expected collectionID is already stored in the concurrent dictionary and if yes, takes it out, adds it to a Final Queue and checks for the next collection in the concurrent dictionary.

Now, from what I have read and the videos I watched it seems to me that TPL Dataflow may be the perfect candidate for such producer/consumer model. I just do not seem to be able to devise a design and thus get started because I have never worked with TPL Dataflow. In terms of throughput and latency is this library even up to the task? I currently process 2.5 million byte arrays and thus objects per second in the resulting collections. Can TPL Dataflow help to simplify? I am especially interested in the answer to the following question: Can TPL Dataflow preserve the order of collection batches from Task1 when spawning off worker tasks and re-merging them once the worker tasks have done their work? Does it optimize things? Having profiled the whole structure I feel there is quite some time wasted due to spinning and too many concurrent collections involved.

Any ideas, thoughts?

like image 688
Matt Avatar asked Jun 15 '12 14:06

Matt


People also ask

Why use TPL DataFlow?

The Task Parallel Library (TPL) provides dataflow components to help increase the robustness of concurrency-enabled applications.

What is ActionBlock?

One of them is ActionBlock. This class is part of the DataFlow class. This class is used to create a process. Not in our discussion, but as far as you can specify a flow that will eventually run Concurrency and Async. With the ActionBlock class, you can specify a task and specify its synchronization settings.


1 Answers

EDIT: Turns out I was very wrong. TransformBlock does return items in the same order they came in, even if it is configured for parallelism. Because of that, the code in my original answer is completely useless and normal TransformBlock can be used instead.


Original answer:

As far as I know only one parallelism construct in .Net supports returning processed items in the order they came in: PLINQ with AsOrdered(). But it seems to me that PLINQ doesn't fit what you want well.

TPL Dataflow, on the other hand, fits well, I think, but it doesn't have a block that would support parallelism and returning items in order at the same time (TransformBlock supports both of them, but not at the same time). Fortunately, Dataflow blocks were designed with composability in mind, so we can build our own block that does that.

But first, we have to figure out how to order the results. Using a concurrent dictionary, like you suggested, along with some synchronization mechanism, would certainly work. But I think there is a simpler solution: use a queue of Tasks. In the output task, you dequeue a Task, wait for it to complete (asynchronously) and when it does, you send its result along. We still need some synchronization for the case when the queue is empty, but we can get that for free if we choose which queue to use cleverly.

So, the general idea is like this: what we're writing will be an IPropagatorBlock, with some input and some output. The easiest way to create a custom IPropagatorBlock is to create one block that processes the input, another block that produces the results and treat them as one using DataflowBlock.Encapsulate().

The input block will have to process the incoming items in the correct order, so no parallelization there. It will create a new Task (actually, a TaskCompletionSource, so that we can set the result of the Task later), add it to the queue and then send the item for processing, along with some way to set the result of the correct Task. Because we don't need to link this block to anything, we can use an ActionBlock.

The output block will have to take Tasks from the queue, asynchronously wait for them, and then send them along. But since all blocks have a queue embedded in them, and blocks that take delegates have asynchronous waiting built-in, this will be very simple: new TransformBlock<Task<TOutput>, TOutput>(t => t). This block will work both as the queue and as the output block. Because of this, we don't have to deal with any synchronization.

The last piece of the puzzle is actually processing the items in parallel. For this, we can use another ActionBlock, this time with MaxDegreeOfParallelism set. It will take the input, process it, and set the result of the correct Task in the queue.

Put together, it could look like this:

public static IPropagatorBlock<TInput, TOutput>
    CreateConcurrentOrderedTransformBlock<TInput, TOutput>(
    Func<TInput, TOutput> transform)
{
    var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t);

    var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
        tuple => tuple.Item2(transform(tuple.Item1)),
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
        });

    var enqueuer = new ActionBlock<TInput>(
        async item =>
        {
            var tcs = new TaskCompletionSource<TOutput>();
            await processor.SendAsync(
                new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult));
            await queue.SendAsync(tcs.Task);
        });

    enqueuer.Completion.ContinueWith(
        _ =>
        {
            queue.Complete();
            processor.Complete();
        });

    return DataflowBlock.Encapsulate(enqueuer, queue);
}

After so much talk, that's quite a small amount of code, I think.

It seems you care about performance a lot, so you might need to fine tune this code. For example, it might make sense to set MaxDegreeOfParallelism of the processor block to something like Environment.ProcessorCount, to avoid oversubscription. Also, if latency is more important than throughput to you, it might make sense to set MaxMessagesPerTask of the same block to 1 (or another small number) so that when processing of an item is finished, it's sent to the output immediately.

Also, if you want to throttle incoming items, you could set BoundedCapacity of enqueuer.

like image 117
svick Avatar answered Sep 22 '22 11:09

svick