Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is TPL Dataflow BufferBlock thread safe?

Tags:

I have a fairly simple producer-consumer pattern where (simplified) I have two producers who produce output that is to be consumed by one consumer.

For this I use System.Threading.Tasks.Dataflow.BufferBlock<T>

A BufferBlock object is created. One Consumer is listening to this BufferBlock, and processes any received input.

Two 'Producerssend data to theBufferBlock` simultaneously

Simplified:

BufferBlock<int> bufferBlock = new BufferBlock<int>();

async Task Consume()
{
    while(await bufferBlock.OutputAvailable())
    {
         int dataToProcess = await outputAvailable.ReceiveAsync();
         Process(dataToProcess);
    }
}

async Task Produce1()
{
    IEnumerable<int> numbersToProcess = ...;
    foreach (int numberToProcess in numbersToProcess)
    {
         await bufferBlock.SendAsync(numberToProcess);
         // ignore result for this example
    }
}

async Task Produce2()
{
    IEnumerable<int> numbersToProcess = ...;
    foreach (int numberToProcess in numbersToProcess)
    {
         await bufferBlock.SendAsync(numberToProcess);
         // ignore result for this example
    }
}

I'd like to start the Consumer first and then start the Producers as separate tasks:

var taskConsumer = Consume(); // do not await yet
var taskProduce1 = Task.Run( () => Produce1());
var taskProduce2 = Task.Run( () => Produce2());

// await until both producers are finished:
await Task.WhenAll(new Task[] {taskProduce1, taskProduce2});
bufferBlock.Complete(); // signal that no more data is expected in bufferBlock

// await for the Consumer to finish:
await taskConsumer;

At first glance, this is exactly how the producer-consumer was meant: several producers produce data while a consumer is consuming the produced data.

Yet, BufferBlock about thread safety says:

Any instance members are not guaranteed to be thread safe.

And I thought that the P in TPL meant Parallel! Should I worry? Is my code not thread safe? Is there a different TPL Dataflow class that I should use?

like image 724
Harald Coppoolse Avatar asked Jun 18 '18 14:06

Harald Coppoolse


2 Answers

Yes, the BufferBlock class is thread safe. I can't back this claim by pointing to an official document, because the "Thread Safety" section has been removed from the documentation. But I can see in the source that the class contains a lock object for synchronizing the incoming messages:

/// <summary>Gets the lock object used to synchronize incoming requests.</summary>
private object IncomingLock { get { return _source; } }

When the Post extension method is called (source code), the explicitly implemented ITargetBlock.OfferMessage method is invoked (source code). Below is an excerpt of this method:

DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader,
    T messageValue, ISourceBlock<T> source, bool consumeToAccept)
{
    //...
    lock (IncomingLock)
    {
        //...
        _source.AddMessage(messageValue);
        //...
    }
}

It would be strange indeed if this class, or any other XxxBlock class included in the TPL Dataflow library, was not thread-safe. It would severely hamper the ease of use of this great library.

like image 59
Theodor Zoulias Avatar answered Sep 28 '22 08:09

Theodor Zoulias


I think an ActionBlock<T> would better suit what your doing since it has a built in buffer that many producers can send data in through. The default block options process the data on single background task but you can set a new value for parallelism and bounded capacity. With ActionBlock<T> the main area of concern to ensure thread safety will be in the delegate you pass that processes each message. The operation of that function has to be independent of each message, i.e. not modifying shared state just like any Parrallel... function.

public class ProducerConsumer
{
    private ActionBlock<int> Consumer { get; }

    public ProducerConsumer()
    {
        Consumer = new ActionBlock<int>(x => Process(x));            
    }

    public async Task Start()
    {
        var producer1Tasks = Producer1();
        var producer2Tasks = Producer2();
        await Task.WhenAll(producer1Tasks.Concat(producer2Tasks));
        Consumer.Complete();
        await Consumer.Completion;
    }

    private void Process(int data)
    {
        // process
    }

    private IEnumerable<Task> Producer1() => Enumerable.Range(0, 100).Select(x => Consumer.SendAsync(x));

    private IEnumerable<Task> Producer2() => Enumerable.Range(0, 100).Select(x => Consumer.SendAsync(x));
}
like image 39
JSteward Avatar answered Sep 28 '22 07:09

JSteward