Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

TPL architectural question

I'm currently working on a project, where we have the challenge to process items in parallel. So far not a big deal ;) Now to the problem. We have a list of IDs, where we periodically (every 2 sec's) what to call a StoredProcedure for each ID. The 2 sec's need to be checked for each item individually, as they are added and removing during runtime. In addition we want to configure the maximum degree of parallelism, as the DB should not be flooded with 300 threads concurrently. An item which is being processed should not be rescheduled for processing until it has finished with the previous execution. Reason is that we want to prevent queueing up a lot of items, in case of delays on the DB.

Right now we are using a self-developed component, that has a main thread, which periodically checks what items need to scheduled for processing. Once it has the list, it's dropping those on a custom IOCP-based thread pool, and then uses waithandles to wait for the items being processed. Then the next iteration starts. IOCP because of the work-stealing it provides.

I would like to replace this custom implementation with a TPL/.NET 4 version, and I would like to know how you would solve it (ideally simple and nicely readable/maintainable). I know about this article: http://msdn.microsoft.com/en-us/library/ee789351.aspx, but it's just limiting the amount of threads being used. Leaves work stealing, periodically executing the items ....

Ideally it will become a generic component, that can be used for some all the tasks that need to be done periodically for a list of items.

any input welcome, tia Martin

like image 951
Martin Moser Avatar asked Jun 10 '11 15:06

Martin Moser


2 Answers

I don't think you actually need to get down and dirty with direct TPL Tasks for this. For starters I would set up a BlockingCollection around a ConcurrentQueue (the default) with no BoundedCapacity set on the BlockingCollection to store the IDs that need to be processed.

// Setup the blocking collection somewhere when your process starts up (OnStart for a Windows service)
BlockingCollection<string> idsToProcess = new BlockingCollection<string>();

From there I would just use Parallel::ForEach on the enumeration returned from the BlockingCollection::GetConsumingEnumerable. In the ForEach call you will setup your ParallelOptions::MaxDegreeOfParallelism Inside the body of the ForEach you will execute your stored procedure.

Now, once the stored procedure execution completes, you're saying you don't want to re-schedule the execution for at least two seconds. No problem, schedule a System.Threading.Timer with a callback which will simply add the ID back to the BlockingCollection in the supplied callback.

Parallel.ForEach(
    idsToProcess.GetConsumingEnumerable(),
    new ParallelOptions 
    { 
        MaxDegreeOfParallelism = 4 // read this from config
    },
    (id) =>
    {
       // ... execute sproc ...

       // Need to declare/assign this before the delegate so that we can dispose of it inside 
       Timer timer = null;

       timer = new Timer(
           _ =>
           {
               // Add the id back to the collection so it will be processed again
               idsToProcess.Add(id);

               // Cleanup the timer
               timer.Dispose();
           },
           null, // no state, id wee need is "captured" in the anonymous delegate
           2000, // probably should read this from config
           Timeout.Infinite);
    }

Finally, when the process is shutting down you would call BlockingCollection::CompleteAdding so that the enumerable being processed with stop blocking and complete and the Parallel::ForEach will exit. If this were a Windows service for example you would do this in OnStop.

// When ready to shutdown you just signal you're done adding
idsToProcess.CompleteAdding();

Update

You raised a valid concern in your comment that you might be processing a large amount of IDs at any given point and fear that there would be too much overhead in a timer per ID. I would absolutely agree with that. So in the case that you are dealing with a large list of IDs concurrently, I would change from using a timer-per-ID to using another queue to hold the "sleeping" IDs which is monitored by a single short interval timer instead. First you'll need a ConcurrentQueue onto which to place the IDs that are asleep:

ConcurrentQueue<Tuple<string, DateTime>> sleepingIds = new ConcurrentQueue<Tuple<string, DateTime>>();

Now, I'm using a two-part Tuple here for illustration purposes, but you may want to create a more strongly typed struct for it (or at least alias it with a using statement) for better readability. The tuple has the id and a DateTime which represents when it was put on the queue.

Now you'll also want to setup the timer that will monitor this queue:

Timer wakeSleepingIdsTimer = new Timer(
   _ =>
   {
       DateTime utcNow = DateTime.UtcNow;

       // Pull all items from the sleeping queue that have been there for at least 2 seconds
       foreach(string id in sleepingIds.TakeWhile(entry => (utcNow - entry.Item2).TotalSeconds >= 2))
       {
           // Add this id back to the processing queue
           idsToProcess.Enqueue(id);
       }
   },
   null, // no state
   Timeout.Infinite, // no due time
   100 // wake up every 100ms, probably should read this from config
 );

Then you would simply change the Parallel::ForEach to do the following instead of setting up a timer for each one:

(id) =>
{
       // ... execute sproc ...

       sleepingIds.Enqueue(Tuple.Create(id, DateTime.UtcNow)); 
}
like image 163
Drew Marsh Avatar answered Sep 21 '22 10:09

Drew Marsh


This is pretty similar to the approach you said you already had in your question, but does so with TPL tasks. A task just adds itself back to a list of things to schedule when its done.

The use of locking on a plain list is fairly ugly in this example, would probably want a better collection to hold the list of things to schedule

// Fill the idsToSchedule
for (int id = 0; id < 5; id++)
{
    idsToSchedule.Add(Tuple.Create(DateTime.MinValue, id));
}

// LongRunning will tell TPL to create a new thread to run this on
Task.Factory.StartNew(SchedulingLoop, TaskCreationOptions.LongRunning);

That starts up the SchedulingLoop, which actually performs the checking if its been two seconds since something ran

// Tuple of the last time an id was processed and the id of the thing to schedule
static List<Tuple<DateTime, int>> idsToSchedule = new List<Tuple<DateTime, int>>();
static int currentlyProcessing = 0;
const int ProcessingLimit = 3;

// An event loop that performs the scheduling
public static void SchedulingLoop()
{
    while (true)
    {
        lock (idsToSchedule)
        {
            DateTime currentTime = DateTime.Now;
            for (int index = idsToSchedule.Count - 1; index >= 0; index--)
            {
                var scheduleItem = idsToSchedule[index];
                var timeSincePreviousRun = (currentTime - scheduleItem.Item1).TotalSeconds;

                // start it executing in a background task
                if (timeSincePreviousRun > 2 && currentlyProcessing < ProcessingLimit)
                {
                    Interlocked.Increment(ref currentlyProcessing);

                    Console.WriteLine("Scheduling {0} after {1} seconds", scheduleItem.Item2, timeSincePreviousRun);

                    // Schedule this task to be processed
                    Task.Factory.StartNew(() =>
                        {
                            Console.WriteLine("Executing {0}", scheduleItem.Item2);

                            // simulate the time taken to call this procedure
                            Thread.Sleep(new Random((int)DateTime.Now.Ticks).Next(0, 5000) + 500);

                            lock (idsToSchedule)
                            {
                                idsToSchedule.Add(Tuple.Create(DateTime.Now, scheduleItem.Item2));
                            }

                            Console.WriteLine("Done Executing {0}", scheduleItem.Item2);
                            Interlocked.Decrement(ref currentlyProcessing);
                        });

                    // remove this from the list of things to schedule
                    idsToSchedule.RemoveAt(index);
                }
            }
        }

        Thread.Sleep(100);
    }
}
like image 40
BrandonAGr Avatar answered Sep 18 '22 10:09

BrandonAGr