Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I prevent "maxing out" of CPU: Synchronous method calling multiple workers asynchronously & throttling using SemaphoreSlim?

I am currently optimizing an existing, very slow and timing out production application. There is no option to re-write it.

In short, it is a WCF service that currently calls 4 other "worker" WCF services sequentially. None of the worker services are dependent on results from the other. So we would like it to call them all at once (not sequentially). I will reiterate that we don't have the luxury of re-writing it.

enter image description here

The optimization involves making it call all worker services at once. This is where asynchrony came to mind.

I have limited experience with asynchronous programming, but I have read as widely as I can on the topic, with respect to my solution.

The problem is, on testing, it works but maxes out my CPU. I would appreciate your help

The following is a simplified version of the essential code in main WCF Service

// The service operation belonging to main WCF Service
public void ProcessAllPendingWork()
{
    var workerTasks = new List<Task<bool>>();
    foreach(var workerService in _workerServices)
    {
        //DoWorkAsync is the worker method with the following signature:
        // Task<bool> DoWorkAsync()

        var workerTask = workerService.DoWorkAsync()
        workerTasks.Add(workerTask);
    }

    var task = Task.Run(async ()=>
    {
        await RunWorkerTasks(workerTasks);
    });
    task.Wait();


}

private async RunWorkerTasks(IEnumerable<Tast<bool>> workerTasks)
{
    using(var semaphore = new SemaphoreSlim(initialCount:3))
    {

        foreach (var workerTask in workerTasks)
        {
            await semaphore.WaitAsync();
            try
            {
                await workerTask;
            }
            catch (System.Exception)
            {
                //assume 'Log' is a predefined logging service
                Log.Error(ex);
            }
        }
    }
} 

What I have read:

Multiple ways how to limit parallel tasks processing

How to limit the amount of concurrent async I/O operations?

Approaches for throttling asynchronous methods in C#

Constraining Concurrent Threads in C#

Limiting Number of Concurrent Threads With SemaphoresSlim

Async WCF call with ChannelFactory and CreateChannel

like image 569
user919426 Avatar asked Aug 20 '19 11:08

user919426


2 Answers

You didn't explain how you wanted to limit the concurrent calls. Do you want 30 concurrent worker tasks running, or do you want 30 WCF calls, each of which have all their worker tasks running concurrently, or do you want concurrent WCF calls to each have their own limit of concurrent worker tasks? Given you said that each WCF call has only 4 worker tasks and looking at your sample code, I assume you want a global limit of 30 concurrent worker tasks.

Firstly, as @mjwills implied, you need to use the SemaphoreSlim to limit calls to workerService.DoWorkAsync(). Your code currently starts all of them, and only tried to throttle how many you'll wait to finish. I assume this is why you max out CPU. The number of worker tasks started remains unbounded. Note however you'll also need to await the worker task while you hold the semaphore, otherwise you'll only throttle how fast you create tasks, not how many run concurrently.

Secondly, you're creating a new SemaphoreSlim for each WCF request. Hence my question from my first paragraph. The only way this will throttle anything is if you have more worker services than the initial count, which in your sample is 30, but you said there are only 4 workers. To have a "global" limit, you need to use a singleton SemaphoreSlim.

Thridly, you never call .Release() on the SemaphoreSlim, so if you did make it a singleton, your code will hang once it's started 30 workers since the process started. Make sure to do it in a try-finally block, so that if the worker crashes, it still gets released.

Here's some hastily written sample code:

public async Task ProcessAllPendingWork()
{
    var workerTasks = new List<Task<bool>>();
    foreach(var workerService in _workerServices)
    {
        var workerTask = RunWorker(workerService);
        workerTasks.Add(workerTask);
    }

    await Task.WhenAll(workerTasks);
}

private async Task<bool> RunWorker(Func<bool> workerService)
{
    // use singleton semaphore.
    await _semaphore.WaitAsync();
    try
    {
        return await workerService.DoWorkAsync();
    }
    catch (System.Exception)
    {
        //assume error is a predefined logging service
        Log.Error(ex);
        return false; // ??
    }
    finally
    {
        _semaphore.Release();
    }
}
like image 167
zivkan Avatar answered Oct 15 '22 16:10

zivkan


The Task abstraction provided by TPL (Task parallel library) is an abstraction of Thread; tasks are enqueued in a thread pool and then executed when an execututor can manage that request.

In other word, depending on some factors (your traffic, CPU vs IO buound and deploy model) trying to execute a managed Task in your worker function may cause no benefit at all (or in some cases be slower).

Saying that, I suggest you to use Task.WaitAll (available from .NET 4.0) that uses very hight level abstractions to manage concurrency; in particular this piece of code could be useful for you:

  • it create workers and wait for all
  • it takes 10 seconds to execute (the longest Worker)
  • it catch and give to you the opportunity to manage exceptions
  • [last but not least] is a declerative api that focus your attention on what to do and not how to do.
public class Q57572902
{
    public void ProcessAllPendingWork()
    {
        var workers = new Action[] {Worker1, Worker2, Worker3};

        try
        {
            Task.WaitAll(workers.Select(Task.Factory.StartNew).ToArray());
            // ok
        }
        catch (AggregateException exceptions)
        {
            foreach (var ex in exceptions.InnerExceptions)
            {
                Log.Error(ex);
            }
            // ko
        }
    }

    public void Worker1() => Thread.Sleep(FromSeconds(5)); // do something

    public void Worker2() => Thread.Sleep(FromSeconds(10)); // do something

    public void Worker3() => throw new NotImplementedException("error to manage"); // something wrong

}

I have seen from comments that you requires a maximum of 3 worker running in the same time; in this case you can simply copy-paste a LimitedConcurrencyLevelTaskScheduler from TaskScheduler documentation.

After that you have to create sigleton instance TaskScheduler with its onw TaskFactory like that:

public static class WorkerScheduler
{
    public static readonly TaskFactory Factory;

    static WorkerScheduler()
    {
        var scheduler = new LimitedConcurrencyLevelTaskScheduler(3);
        Factory = new TaskFactory(scheduler);
    }
}

Previous ProcessAllPendingWork() code remains the same except for

...workers.Select(Task.Factory.StartNew)...

that becomes

...workers.Select(WorkerScheduler.Factory.StartNew)...

because you have to use the TaskFactory associated to your custom WorkerScheduler.

If your worker needs to return some data to response, errors and data needs to be managed in a different manner as follows:

public void ProcessAllPendingWork()
{
    var workers = new Func<bool>[] {Worker1, Worker2, Worker3};
    var tasks = workers.Select(WorkerScheduler.Factory.StartNew).ToArray();

    bool[] results = null;

    Task
        .WhenAll(tasks)
        .ContinueWith(x =>
        {
            if (x.Status == TaskStatus.Faulted)
            {
                foreach (var exception in x.Exception.InnerExceptions)
                    Log(exception);

                return;
            }

            results = x.Result; // save data in outer scope
        })
        .Wait();

    // continue execution
    // results is now filled: if results is null, some errors occured
}
like image 39
Claudio Avatar answered Oct 15 '22 16:10

Claudio