I want to set up a TransformBlock
that processes its item in parallel. Thus, I'm setting ExecutionDataflowBlockOptions.MaxDegreeOfParallelism
to > 1. I don't care about the order of the messages but the documentation says:
When you specify a maximum degree of parallelism that is larger than 1, multiple messages are processed simultaneously, and therefore, messages might not be processed in the order in which they are received. The order in which the messages are output from the block will, however, be correctly ordered.
Does "correctly ordered" mean that if there is one message in the queue that needs long processing time, further messages are not output until this one message is processed?
And if so, how can I specify an Execution Block (for example a TransformBlock
) that does not care about the ordering? Or do I have to specify at the consumption end that I don't care about ordering?
There is no such block in the library, but you can easily create one yourself by combining an ActionBlock
and a BufferBlock
. Something like:
public static IPropagatorBlock<TInput, TOutput>
CreateUnorderedTransformBlock<TInput, TOutput>(
Func<TInput, TOutput> func, ExecutionDataflowBlockOptions options)
{
var buffer = new BufferBlock<TOutput>(options);
var action = new ActionBlock<TInput>(
async input =>
{
var output = func(input);
await buffer.SendAsync(output);
}, options);
action.Completion.ContinueWith(
t =>
{
IDataflowBlock castedBuffer = buffer;
if (t.IsFaulted)
{
castedBuffer.Fault(t.Exception);
}
else if (t.IsCanceled)
{
// do nothing: both blocks share options,
// which means they also share CancellationToken
}
else
{
castedBuffer.Complete();
}
});
return DataflowBlock.Encapsulate(action, buffer);
}
This way, once an item is processed by the ActionBlock
, it's immediately moved to the BufferBlock
, which means ordering is not maintained.
One issue with this code is that it doesn't observe the set BoundedCapacity
well: in effect, the capacity of this block is twice the capacity set in options (because each of the two blocks has a separate capacity).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With