Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to set maximum concurrent threads using Task

I am writing a stress testing utility. In this utility I want that I continuously put load of 10 threads (out of 10,000). Here is my code

            Stopwatch watch = new Stopwatch();
        watch.Start();

        int itemProcessed = 0;

        do
        {
            List<Task> taskList = new List<Task>();
            for (int i = 0; i < _parallelThreadCount; i++)
            {
                taskList.Add(Task.Factory.StartNew(() => _taskDelegate()));
                itemProcessed++;
            }
            Task.WaitAll(taskList.ToArray());
        } while (itemProcessed < _batchSize);

        watch.Stop();

Now the problem is that I have use Task.WaitAll, due to which Initially load is 10 threads, then 9,8,7,6,5,4,3,2,1,0. And then again I add 10 More threads.

Can someone give me idea that how to achieve this.

like image 829
Zeeshan Umar Avatar asked Nov 27 '22 09:11

Zeeshan Umar


2 Answers

Shaamaan's answer is good and probably the one that you want to go with for your particular scenario. I'm just presenting a couple other possible options that you could use, and that may be more applicable for other situations.

My blog post shows how to do this both with Tasks and with Actions, and provides a sample project you can download and run to see both in action.

With Actions

If using Actions, you can use the built-in .Net Parallel.Invoke function. Here we limit it to running at most 10 threads in parallel.

var listOfActions = new List<Action>();
for (int i = 0; i < 10000; i++)
{
    // Note that we create the Action here, but do not start it.
    listOfActions.Add(() => DoSomething());
}

var options = new ParallelOptions {MaxDegreeOfParallelism = 10};
Parallel.Invoke(options, listOfActions.ToArray());

With Tasks

With Tasks there is no built-in function. However, you can use the one that I provide on my blog.

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static void StartAndWaitAllThrottled(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken())
    {
        StartAndWaitAllThrottled(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken);
    }

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static void StartAndWaitAllThrottled(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken())
    {
        // Convert to a list of tasks so that we don&#39;t enumerate over it multiple times needlessly.
        var tasks = tasksToRun.ToList();

        using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel))
        {
            var postTaskTasks = new List<Task>();

            // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running.
            tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release())));

            // Start running each task.
            foreach (var task in tasks)
            {
                // Increment the number of tasks currently running and wait if too many are running.
                throttler.Wait(timeoutInMilliseconds, cancellationToken);

                cancellationToken.ThrowIfCancellationRequested();
                task.Start();
            }

            // Wait for all of the provided tasks to complete.
            // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler&#39;s using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object.
            Task.WaitAll(postTaskTasks.ToArray(), cancellationToken);
        }
    }

And then creating your list of Tasks and calling the function to have them run, with say a maximum of 10 simultaneous at a time, you could do this:

var listOfTasks = new List<Task>();
for (int i = 0; i < 10000; i++)
{
    var count = i;
    // Note that we create the Task here, but do not start it.
    listOfTasks.Add(new Task(() => Something()));
}
Tasks.StartAndWaitAllThrottled(listOfTasks, 10);
like image 51
deadlydog Avatar answered Dec 16 '22 01:12

deadlydog


If you could restructure your code a bit (read: replace your do while loop), you can make use of the Parallel class. Here's a quick example:

List<int> data = new List<int>() { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
Parallel.ForEach(data, new ParallelOptions() { MaxDegreeOfParallelism = 10 }, d =>
{
    Console.WriteLine(d);
});

The bit you're likely most interested in is the MaxDegreeOfParallelism property of ParallelOptions - it specifies how many threads can be running at the same time.

EDIT:

As you don't have a list of tasks but rather just want to repeat the same operation a number of times, then you can use Parallel.For. Here's what the code might look like:

int repeatCount = 100;
int itemProcessed = 0;
Parallel.For(0, repeatCount, new ParallelOptions() { MaxDegreeOfParallelism = 10 }, i =>
{
    _taskDelegate();
    System.Threading.Interlocked.Increment(ref itemProcessed);
});

Note that if the only reason you used itemProcessed was to check how long your loop is to work, you can safely remove the two lines from the code above.

like image 32
Shaamaan Avatar answered Dec 16 '22 01:12

Shaamaan