Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I create a constant Processing "Flow" using the TPL in C# 4

I'm not sure if the following is possible but I'd like to Invoke a number of Actions, in Paralell, in a throttled manner, but keep the flow of processing continuous, without reverting back to using timers or loop/sleep cycles.

So far I've gotten it working that it loads a large batch of inputs from some source... and then processes them in paralell in a controlled way & loops around like below.

static void Main(string[] args)
{
    while(true) //Simulate a Timer Elapsing...
    {
        IEnumerable<int> inputs = new List<int>() {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};  
        //Simulate querying database queue tables for next batch of entries

        RunAllActions(inputs, 3); //Max 3 at a time.
    }
}

static void RunAllActions(IEnumerable<int> inputs, int maxConcurrency)
{
    var options = new ParallelOptions() {MaxDegreeOfParallelism = maxConcurrency};

    Parallel.ForEach<int>(inputs, options, DoWork);
    //Blocks here until all inputs are processed.
    Console.WriteLine("Batch of Work Done!!!");
}

static void DoWork(int input)
{
    Console.WriteLine("Starting Task {0}", input);
    System.Threading.Thread.Sleep(3000);
    Console.WriteLine("Finishing Task {0}", input);
}

what I'd like to know is, Is there a construct in the TPL that I could use to keep it always running... so that I can replace the "Timer Elapsing" & "Database Polling" with a MessageQueue Receieved event.

The following is rough version of what I'd like to Achieve... there's otherways I can go about it, but I want to know is this sort of Pattern built into the TPL.

internal class Engine
{
    private MessageQueue mq;
    private Queue<int> myInternalApplicationQueue;

    public Engine()
    {
        //Message Queue to get new task inputs from
        mq = new MessageQueue();
        mq.ReceiveCompleted += new ReceiveCompletedEventHandler(mq_ReceiveCompleted);

        // internal Queue to put them in.
        myInternalApplicationQueue = new Queue<int>();
    }

    void mq_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
    {
        //On MQ Receive, pop the input in a queue in my app
        int input = (int) e.Message.Body;

        myInternalApplicationQueue.Enqueue(input);
    }

    public void StartWorking()
    {
        //Once this gets called, it doesn't stop... it just keeps processing/watching that queue
        //processing the tasks as fast as it's allowed while the app is running.
        var options = new ParallelOptions() { MaxDegreeOfParallelism = 3 };
        Parallel.KeepWorkingOnQueue<int>(myInternalApplicationQueue, options, DoWork);
        //       ^^^^^^^^^^^^^^^^^^ <----- THIS GUY
    }

}
like image 537
Eoin Campbell Avatar asked Feb 10 '12 18:02

Eoin Campbell


People also ask

What is TPL Dataflow?

The TPL Dataflow Library consists of dataflow blocks, which are data structures that buffer and process data. The TPL defines three kinds of dataflow blocks: source blocks, target blocks, and propagator blocks. A source block acts as a source of data and can be read from.

What is TPL in C#?

The Task Parallel Library (TPL) is a set of public types and APIs in the System. Threading and System. Threading. Tasks namespaces. The purpose of the TPL is to make developers more productive by simplifying the process of adding parallelism and concurrency to applications.


1 Answers

You can use BlockingCollection<T> to handle this type of operation, which is effectively a producer/consumer scenario.

Basically, you'd setup a BlockingCollection<T> and use it as your "producer". You would then have three (or any number) of consumer tasks (which are often setup as long running tasks) that process elements (by calling blockingCollection.GetConsumingEnumerable() in a standard foreach loop).

You then add items as needed to the collection, and they will continually be processed. When you are completely done, you'd call BlockingCollection<T>.CompleteAdding, which will cause the foreach loops to complete, and the entire thing stops.

As a side note - you typically do not want to use Parallel.ForEach on the GetConsumingEnumerable() from a BlockingCollection<T> - at least not unless you handle the partitioning yourself. It's typically better to use multiple tasks and have each iterate sequentially. The reason is that the default partitioning scheme in Parallel.ForEach will cause problems (it waits until a "chunk" of data is available, instead of immediately processing items, and the "chunks" get larger and larger over time).

like image 53
Reed Copsey Avatar answered Oct 01 '22 12:10

Reed Copsey