Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Azure Worker Role multithread queue processing

I have an azure Cloud service with classic configuration WebRole + Worker role. The Worker takes messages from a queue, process it than delete it once at a time.

My code is like this:

public override void Run()
    {
        Trace.TraceInformation("Worker is running");
            try
            {
                this.RunAsync(this.cancellationTokenSource.Token).Wait();
            }
            finally
            {
                this.runCompleteEvent.Set();
            }
    }

public override bool OnStart()
        {
            ServicePointManager.DefaultConnectionLimit = 500;
            bool result = base.OnStart();
            Trace.TraceInformation("WorkerAnalytics has been started");
            return result;
        }



private async Task RunAsync(CancellationToken cancellationToken)
        {
            var queue = ....//omitted info for brevity
            CloudQueueMessage retrievedMessage = null;

            while (!cancellationToken.IsCancellationRequested)
            {
                 try
                    {
                        retrievedMessage = await queue.GetMessageAsync();
                        if (retrievedMessage != null)
                        {
                            await ProcessMessage(retrievedMessage);
                        }
                        else
                        {
                            System.Threading.Thread.Sleep(500);
                        }
                    }
                    catch (Exception e)
                    {
                        System.Threading.Thread.Sleep(500);
                    }
                }
            }

        }

Now this works perfectly but the CPU is very low, at 3%, it process only one element at a time (about 1 sec each), but the queue have about 1000 new elements per second and it is not enough.

How can process more queue messages at a time using all the CPU power the machine has and without complicating too much this code?

Also what the ServicePointManager.DefaultConnectionLimit is for?

I searched for hours for an effective multithreaded solution for Worker Roles but is all WebJobs now or old frameworks that complicate things.

Thank you

like image 538
Francesco Cristallo Avatar asked Dec 29 '25 15:12

Francesco Cristallo


1 Answers

You can try run multiple taks of RunAsync().

var tasks = new List<Task>();
tasks.Add(this.RunAsync(this.cancellationTokenSource.Token));
tasks.Add(this.RunAsync(this.cancellationTokenSource.Token));
tasks.Add(this.RunAsync(this.cancellationTokenSource.Token));
Task.WaitAll(tasks.ToArray());
like image 183
pauliusnrk Avatar answered Jan 01 '26 07:01

pauliusnrk



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!