Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Queuing Actions/Delegates for Asyncronous Execution

Is there something in the framework that would allow me to asynchronously execute a queue of delegates?

What I mean by that is I want the delegates to execute one at a time in the order they are queued but I want this whole process to run asynchronously. The queue is not fixed either, additional delegates would be added periodically and should be processed as soon as it reaches the top of the queue.

I don't need to use a Queue in particular, it's just how I would describe the desired behavior.

I could write something myself to do it but if there is something built in I could use instead that would be better.

I briefly looked at ThreadPool.QueueUserWorkItem as it allows executing in order but could find a satisfactory way to prevent more than one execution at a time.

like image 772
Ashigore Avatar asked Nov 22 '25 18:11

Ashigore


1 Answers

Is there something in the framework that would allow me to asynchronously execute a queue of delegates?

I'd implement this as a custom task scheduler. You could then queue and run your delegates as tasks, which would give you all benefits of exception handling, cancellation, and async/await.

Implementing a task scheduler which would execute your delegates in the serial order is quite simple, using BlockingCollection. The SerialTaskScheduler below is a simplified version of Stephen Toub's StaTaskScheduler:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Console_21628490
{
    // Test
    class Program
    {
        static async Task DoWorkAsync()
        {
            using (var scheduler = new SerialTaskScheduler())
            {
                var tasks = Enumerable.Range(1, 10).Select(i =>
                    scheduler.Run(() =>
                    {
                        var sleep = 1000 / i;
                        Thread.Sleep(sleep);
                        Console.WriteLine("Task #" + i + ", sleep: " + sleep);
                    }, CancellationToken.None));

                await Task.WhenAll(tasks);
            }
        }

        static void Main(string[] args)
        {
            DoWorkAsync().Wait();
            Console.ReadLine();
        }
    }

    // SerialTaskScheduler
    public sealed class SerialTaskScheduler : TaskScheduler, IDisposable
    {
        Task _schedulerTask;
        BlockingCollection<Task> _tasks;
        Thread _schedulerThread;

        public SerialTaskScheduler()
        {
            _tasks = new BlockingCollection<Task>();

            _schedulerTask = Task.Run(() =>
            {
                _schedulerThread = Thread.CurrentThread;

                foreach (var task in _tasks.GetConsumingEnumerable())
                    TryExecuteTask(task);
            });
        }

        protected override void QueueTask(Task task)
        {
            _tasks.Add(task);
        }

        protected override IEnumerable<Task> GetScheduledTasks()
        {
            return _tasks.ToArray();
        }

        protected override bool TryExecuteTaskInline(
            Task task, bool taskWasPreviouslyQueued)
        {
            return _schedulerThread == Thread.CurrentThread &&
                TryExecuteTask(task);
        }

        public override int MaximumConcurrencyLevel
        {
            get { return 1; }
        }

        public void Dispose()
        {
            if (_schedulerTask != null)
            {
                _tasks.CompleteAdding();
                _schedulerTask.Wait();
                _tasks.Dispose();
                _tasks = null;
                _schedulerTask = null;
            }
        }

        public Task Run(Action action, CancellationToken token)
        {
            return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this);
        }

        public Task Run(Func<Task> action, CancellationToken token)
        {
            return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();
        }

        public Task<T> Run<T>(Func<Task<T>> action, CancellationToken token)
        {
            return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();
        }
    }
}

Output:

Task #1, sleep: 1000
Task #2, sleep: 500
Task #3, sleep: 333
Task #4, sleep: 250
Task #5, sleep: 200
Task #6, sleep: 166
Task #7, sleep: 142
Task #8, sleep: 125
Task #9, sleep: 111
Task #10, sleep: 100
like image 147
noseratio Avatar answered Nov 24 '25 08:11

noseratio



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!