Is there something in the framework that would allow me to asynchronously execute a queue of delegates?
What I mean by that is I want the delegates to execute one at a time in the order they are queued but I want this whole process to run asynchronously. The queue is not fixed either, additional delegates would be added periodically and should be processed as soon as it reaches the top of the queue.
I don't need to use a Queue in particular, it's just how I would describe the desired behavior.
I could write something myself to do it but if there is something built in I could use instead that would be better.
I briefly looked at ThreadPool.QueueUserWorkItem as it allows executing in order but could find a satisfactory way to prevent more than one execution at a time.
Is there something in the framework that would allow me to asynchronously execute a queue of delegates?
I'd implement this as a custom task scheduler. You could then queue and run your delegates as tasks, which would give you all benefits of exception handling, cancellation, and async/await.
Implementing a task scheduler which would execute your delegates in the serial order is quite simple, using BlockingCollection. The SerialTaskScheduler below is a simplified version of Stephen Toub's StaTaskScheduler:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Console_21628490
{
// Test
class Program
{
static async Task DoWorkAsync()
{
using (var scheduler = new SerialTaskScheduler())
{
var tasks = Enumerable.Range(1, 10).Select(i =>
scheduler.Run(() =>
{
var sleep = 1000 / i;
Thread.Sleep(sleep);
Console.WriteLine("Task #" + i + ", sleep: " + sleep);
}, CancellationToken.None));
await Task.WhenAll(tasks);
}
}
static void Main(string[] args)
{
DoWorkAsync().Wait();
Console.ReadLine();
}
}
// SerialTaskScheduler
public sealed class SerialTaskScheduler : TaskScheduler, IDisposable
{
Task _schedulerTask;
BlockingCollection<Task> _tasks;
Thread _schedulerThread;
public SerialTaskScheduler()
{
_tasks = new BlockingCollection<Task>();
_schedulerTask = Task.Run(() =>
{
_schedulerThread = Thread.CurrentThread;
foreach (var task in _tasks.GetConsumingEnumerable())
TryExecuteTask(task);
});
}
protected override void QueueTask(Task task)
{
_tasks.Add(task);
}
protected override IEnumerable<Task> GetScheduledTasks()
{
return _tasks.ToArray();
}
protected override bool TryExecuteTaskInline(
Task task, bool taskWasPreviouslyQueued)
{
return _schedulerThread == Thread.CurrentThread &&
TryExecuteTask(task);
}
public override int MaximumConcurrencyLevel
{
get { return 1; }
}
public void Dispose()
{
if (_schedulerTask != null)
{
_tasks.CompleteAdding();
_schedulerTask.Wait();
_tasks.Dispose();
_tasks = null;
_schedulerTask = null;
}
}
public Task Run(Action action, CancellationToken token)
{
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this);
}
public Task Run(Func<Task> action, CancellationToken token)
{
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();
}
public Task<T> Run<T>(Func<Task<T>> action, CancellationToken token)
{
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();
}
}
}
Output:
Task #1, sleep: 1000 Task #2, sleep: 500 Task #3, sleep: 333 Task #4, sleep: 250 Task #5, sleep: 200 Task #6, sleep: 166 Task #7, sleep: 142 Task #8, sleep: 125 Task #9, sleep: 111 Task #10, sleep: 100
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With