Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I specify an unordered Execution Block using the TPL Dataflow Library?

I want to set up a TransformBlock that processes its item in parallel. Thus, I'm setting ExecutionDataflowBlockOptions.MaxDegreeOfParallelism to > 1. I don't care about the order of the messages but the documentation says:

When you specify a maximum degree of parallelism that is larger than 1, multiple messages are processed simultaneously, and therefore, messages might not be processed in the order in which they are received. The order in which the messages are output from the block will, however, be correctly ordered.

Does "correctly ordered" mean that if there is one message in the queue that needs long processing time, further messages are not output until this one message is processed?

And if so, how can I specify an Execution Block (for example a TransformBlock) that does not care about the ordering? Or do I have to specify at the consumption end that I don't care about ordering?

like image 951
Dejan Avatar asked Apr 06 '14 12:04

Dejan


1 Answers

There is no such block in the library, but you can easily create one yourself by combining an ActionBlock and a BufferBlock. Something like:

public static IPropagatorBlock<TInput, TOutput>
    CreateUnorderedTransformBlock<TInput, TOutput>(
    Func<TInput, TOutput> func, ExecutionDataflowBlockOptions options)
{
    var buffer = new BufferBlock<TOutput>(options);
    var action = new ActionBlock<TInput>(
        async input =>
        {
            var output = func(input);
            await buffer.SendAsync(output);
        }, options);

    action.Completion.ContinueWith(
        t =>
        {
            IDataflowBlock castedBuffer = buffer;

            if (t.IsFaulted)
            {
                castedBuffer.Fault(t.Exception);
            }
            else if (t.IsCanceled)
            {
                // do nothing: both blocks share options,
                // which means they also share CancellationToken
            }
            else
            {
                castedBuffer.Complete();
            }
        });

    return DataflowBlock.Encapsulate(action, buffer);
}

This way, once an item is processed by the ActionBlock, it's immediately moved to the BufferBlock, which means ordering is not maintained.

One issue with this code is that it doesn't observe the set BoundedCapacity well: in effect, the capacity of this block is twice the capacity set in options (because each of the two blocks has a separate capacity).

like image 165
svick Avatar answered Oct 13 '22 08:10

svick