I have a fairly simple producer-consumer pattern where (simplified) I have two producers who produce output that is to be consumed by one consumer.
For this I use System.Threading.Tasks.Dataflow.BufferBlock<T>
A BufferBlock
object is created. One Consumer
is listening to this BufferBlock
, and processes any received input.
Two 'Producerssend data to the
BufferBlock` simultaneously
Simplified:
BufferBlock<int> bufferBlock = new BufferBlock<int>();
async Task Consume()
{
while(await bufferBlock.OutputAvailable())
{
int dataToProcess = await outputAvailable.ReceiveAsync();
Process(dataToProcess);
}
}
async Task Produce1()
{
IEnumerable<int> numbersToProcess = ...;
foreach (int numberToProcess in numbersToProcess)
{
await bufferBlock.SendAsync(numberToProcess);
// ignore result for this example
}
}
async Task Produce2()
{
IEnumerable<int> numbersToProcess = ...;
foreach (int numberToProcess in numbersToProcess)
{
await bufferBlock.SendAsync(numberToProcess);
// ignore result for this example
}
}
I'd like to start the Consumer first and then start the Producers as separate tasks:
var taskConsumer = Consume(); // do not await yet
var taskProduce1 = Task.Run( () => Produce1());
var taskProduce2 = Task.Run( () => Produce2());
// await until both producers are finished:
await Task.WhenAll(new Task[] {taskProduce1, taskProduce2});
bufferBlock.Complete(); // signal that no more data is expected in bufferBlock
// await for the Consumer to finish:
await taskConsumer;
At first glance, this is exactly how the producer-consumer was meant: several producers produce data while a consumer is consuming the produced data.
Yet, BufferBlock about thread safety says:
Any instance members are not guaranteed to be thread safe.
And I thought that the P in TPL meant Parallel! Should I worry? Is my code not thread safe? Is there a different TPL Dataflow class that I should use?
Yes, the BufferBlock
class is thread safe. I can't back this claim by pointing to an official document, because the "Thread Safety" section has been removed from the documentation. But I can see in the source that the class contains a lock object for synchronizing the incoming messages:
/// <summary>Gets the lock object used to synchronize incoming requests.</summary>
private object IncomingLock { get { return _source; } }
When the Post
extension method is called (source code), the explicitly implemented ITargetBlock.OfferMessage
method is invoked (source code). Below is an excerpt of this method:
DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader,
T messageValue, ISourceBlock<T> source, bool consumeToAccept)
{
//...
lock (IncomingLock)
{
//...
_source.AddMessage(messageValue);
//...
}
}
It would be strange indeed if this class, or any other XxxBlock
class included in the TPL Dataflow library, was not thread-safe. It would severely hamper the ease of use of this great library.
I think an ActionBlock<T>
would better suit what your doing since it has a built in buffer that many producers can send data in through. The default block options process the data on single background task but you can set a new value for parallelism and bounded capacity. With ActionBlock<T>
the main area of concern to ensure thread safety will be in the delegate you pass that processes each message. The operation of that function has to be independent of each message, i.e. not modifying shared state just like any Parrallel...
function.
public class ProducerConsumer
{
private ActionBlock<int> Consumer { get; }
public ProducerConsumer()
{
Consumer = new ActionBlock<int>(x => Process(x));
}
public async Task Start()
{
var producer1Tasks = Producer1();
var producer2Tasks = Producer2();
await Task.WhenAll(producer1Tasks.Concat(producer2Tasks));
Consumer.Complete();
await Consumer.Completion;
}
private void Process(int data)
{
// process
}
private IEnumerable<Task> Producer1() => Enumerable.Range(0, 100).Select(x => Consumer.SendAsync(x));
private IEnumerable<Task> Producer2() => Enumerable.Range(0, 100).Select(x => Consumer.SendAsync(x));
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With