Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to run a Task on a custom TaskScheduler using await?

I have some methods returning Task<T> on which I can await at will. I'd like to have those Tasks executed on a custom TaskScheduler instead of the default one.

var task = GetTaskAsync ();
await task;

I know I can create a new TaskFactory (new CustomScheduler ()) and do a StartNew () from it, but StartNew () takes an action and create the Task, and I already have the Task (returned behind the scenes by a TaskCompletionSource)

How can I specify my own TaskScheduler for await ?

like image 958
Stephane Delcroix Avatar asked Mar 15 '13 09:03

Stephane Delcroix


People also ask

Does await start a task?

No, async await is just made to allow code to run whilst something else is blocking, and it doesn't do Task.

How do you trigger a scheduled task?

Go to the Scheduled Tasks applet in Control Panel, right-click the task you want to start immediately, and select Run from the displayed context menu.

Can you run a scheduled task as system?

Go to Start > Administrative Tools > Task Scheduler. In the Task Scheduler window click "Create Task" on the right hand bar under the "Actions" pane. In the "Create Task" dialog click the "Change User or Group" button.

How do I run a scheduled task every hour?

To set the script to run hourly, we select the 'Repeat task…' option and enable it. We select the '1 hour' option, indicating that we wish for the task to execute on an hourly basis, and select the duration as indefinite under the duration option.


3 Answers

I think what you really want is to do a Task.Run, but with a custom scheduler. StartNew doesn't work intuitively with asynchronous methods; Stephen Toub has a great blog post about the differences between Task.Run and TaskFactory.StartNew.

So, to create your own custom Run, you can do something like this:

private static readonly TaskFactory myTaskFactory = new TaskFactory(     CancellationToken.None, TaskCreationOptions.DenyChildAttach,     TaskContinuationOptions.None, new MyTaskScheduler()); private static Task RunOnMyScheduler(Func<Task> func) {   return myTaskFactory.StartNew(func).Unwrap(); } private static Task<T> RunOnMyScheduler<T>(Func<Task<T>> func) {   return myTaskFactory.StartNew(func).Unwrap(); } private static Task RunOnMyScheduler(Action func) {   return myTaskFactory.StartNew(func); } private static Task<T> RunOnMyScheduler<T>(Func<T> func) {   return myTaskFactory.StartNew(func); } 

Then you can execute synchronous or asynchronous methods on your custom scheduler.

like image 196
Stephen Cleary Avatar answered Oct 04 '22 11:10

Stephen Cleary


The TaskCompletionSource<T>.Task is constructed without any action and the scheduler is assigned on the first call to ContinueWith(...) (from Asynchronous Programming with the Reactive Framework and the Task Parallel Library — Part 3).

Thankfully you can customize the await behavior slightly by implementing your own class deriving from INotifyCompletion and then using it in a pattern similar to await SomeTask.ConfigureAwait(false) to configure the scheduler that the task should start using in the OnCompleted(Action continuation) method (from await anything;).

Here is the usage:

    TaskCompletionSource<object> source = new TaskCompletionSource<object>();      public async Task Foo() {         // Force await to schedule the task on the supplied scheduler         await SomeAsyncTask().ConfigureScheduler(scheduler);     }      public Task SomeAsyncTask() { return source.Task; } 

Here is a simple implementation of ConfigureScheduler using a Task extension method with the important part in OnCompleted:

public static class TaskExtension {     public static CustomTaskAwaitable ConfigureScheduler(this Task task, TaskScheduler scheduler) {         return new CustomTaskAwaitable(task, scheduler);     } }  public struct CustomTaskAwaitable {     CustomTaskAwaiter awaitable;      public CustomTaskAwaitable(Task task, TaskScheduler scheduler) {         awaitable = new CustomTaskAwaiter(task, scheduler);     }      public CustomTaskAwaiter GetAwaiter() { return awaitable; }      public struct CustomTaskAwaiter : INotifyCompletion {         Task task;         TaskScheduler scheduler;          public CustomTaskAwaiter(Task task, TaskScheduler scheduler) {             this.task = task;             this.scheduler = scheduler;         }          public void OnCompleted(Action continuation) {             // ContinueWith sets the scheduler to use for the continuation action             task.ContinueWith(x => continuation(), scheduler);         }          public bool IsCompleted { get { return task.IsCompleted; } }         public void GetResult() { }     } } 

Here's a working sample that will compile as a console application:

using System; using System.Collections.Generic; using System.Runtime.CompilerServices; using System.Threading.Tasks;  namespace Example {     class Program {         static TaskCompletionSource<object> source = new TaskCompletionSource<object>();         static TaskScheduler scheduler = new CustomTaskScheduler();          static void Main(string[] args) {             Console.WriteLine("Main Started");             var task = Foo();             Console.WriteLine("Main Continue ");             // Continue Foo() using CustomTaskScheduler             source.SetResult(null);             Console.WriteLine("Main Finished");         }          public static async Task Foo() {             Console.WriteLine("Foo Started");             // Force await to schedule the task on the supplied scheduler             await SomeAsyncTask().ConfigureScheduler(scheduler);             Console.WriteLine("Foo Finished");         }          public static Task SomeAsyncTask() { return source.Task; }     }      public struct CustomTaskAwaitable {         CustomTaskAwaiter awaitable;          public CustomTaskAwaitable(Task task, TaskScheduler scheduler) {             awaitable = new CustomTaskAwaiter(task, scheduler);         }          public CustomTaskAwaiter GetAwaiter() { return awaitable; }          public struct CustomTaskAwaiter : INotifyCompletion {             Task task;             TaskScheduler scheduler;              public CustomTaskAwaiter(Task task, TaskScheduler scheduler) {                 this.task = task;                 this.scheduler = scheduler;             }              public void OnCompleted(Action continuation) {                 // ContinueWith sets the scheduler to use for the continuation action                 task.ContinueWith(x => continuation(), scheduler);             }              public bool IsCompleted { get { return task.IsCompleted; } }             public void GetResult() { }         }     }      public static class TaskExtension {         public static CustomTaskAwaitable ConfigureScheduler(this Task task, TaskScheduler scheduler) {             return new CustomTaskAwaitable(task, scheduler);         }     }      public class CustomTaskScheduler : TaskScheduler {         protected override IEnumerable<Task> GetScheduledTasks() { yield break; }         protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { return false; }         protected override void QueueTask(Task task) {             TryExecuteTask(task);         }     } } 
like image 28
Adam Davidson Avatar answered Oct 04 '22 11:10

Adam Davidson


There is no way to embed rich async features into a custom TaskScheduler. This class was not designed with async/await in mind. The standard way to use a custom TaskScheduler is as an argument to the Task.Factory.StartNew method. This method does not understand async delegates. It is possible to provide an async delegate, but it is treated as any other delegate that returns some result. To get the actual awaited result of the async delegate one must call Unwrap() to the task returned.

This is not the problem though. The problem is that the TaskScheduler infrastructure does not treat the async delegate as a single unit of work. Each task is split into multiple mini-tasks (using every await as a separator), and each mini-task is processed individually. This severely restricts the asynchronous functionality that can be implemented on top of this class. As an example here is a custom TaskScheduler that is intended to queue the supplied tasks one at a time (to limit the concurrency in other words):

public class MyTaskScheduler : TaskScheduler
{
    private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1);

    protected async override void QueueTask(Task task)
    {
        await _semaphore.WaitAsync();
        try
        {
            await Task.Run(() => base.TryExecuteTask(task));
            await task;
        }
        finally
        {
            _semaphore.Release();
        }
    }

    protected override bool TryExecuteTaskInline(Task task,
        bool taskWasPreviouslyQueued) => false;

    protected override IEnumerable<Task> GetScheduledTasks() { yield break; }
}

The SemaphoreSlim should ensure that only one Task would run at a time. Unfortunately it doesn't work. The semaphore is released prematurely, because the Task passed in the call QueueTask(task) is not the task that represents the whole work of the async delegate, but only the part until the first await. The other parts are passed to the TryExecuteTaskInline method. There is no way to correlate these task-parts, because no identifier or other mechanism is provided. Here is what happens in practice:

var taskScheduler = new MyTaskScheduler();
var tasks = Enumerable.Range(1, 5).Select(n => Task.Factory.StartNew(async () =>
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Item {n} Started");
    await Task.Delay(1000);
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Item {n} Finished");
}, default, TaskCreationOptions.None, taskScheduler))
.Select(t => t.Unwrap())
.ToArray();
Task.WaitAll(tasks);

Output:

05:29:58.346 Item 1 Started
05:29:58.358 Item 2 Started
05:29:58.358 Item 3 Started
05:29:58.358 Item 4 Started
05:29:58.358 Item 5 Started
05:29:59.358 Item 1 Finished
05:29:59.374 Item 5 Finished
05:29:59.374 Item 4 Finished
05:29:59.374 Item 2 Finished
05:29:59.374 Item 3 Finished

Disaster, all tasks are queued at once.

Conclusion: Customizing the TaskScheduler class is not the way to go when advanced async features are required.

Update: Here is another observation, regarding custom TaskSchedulers in the presence of an ambient SynchronizationContext. The await mechanism by default captures the current SynchronizationContext, or the current TaskScheduler, and invokes the continuation on either the captured context or the scheduler. If both are present, the current SynchronizationContext is preferred, and the current TaskScheduler is ignored. Below is a demonstration of this behavior, in a WinForms application¹:

private async void Button1_Click(object sender, EventArgs e)
{
    await Task.Factory.StartNew(async () =>
    {
        MessageBox.Show($"{Thread.CurrentThread.ManagedThreadId}, {TaskScheduler.Current}");
        await Task.Delay(1000);
        MessageBox.Show($"{Thread.CurrentThread.ManagedThreadId}, {TaskScheduler.Current}");
    }, default, TaskCreationOptions.None,
        TaskScheduler.FromCurrentSynchronizationContext()).Unwrap();
}

Clicking the button causes two messages to popup sequentially, with this information:

1, System.Threading.Tasks.SynchronizationContextTaskScheduler

1, System.Threading.Tasks.ThreadPoolTaskScheduler

This experiment shows that only the first part of the asynchronous delegate, the part before the first await, was scheduled on the non-default scheduler. This behavior limits even further the practical usefulness of custom TaskSchedulers in an async/await-enabled environment.

¹ Windows Forms applications have a WindowsFormsSynchronizationContext installed automatically, when the Application.Run method is called.

like image 36
Theodor Zoulias Avatar answered Oct 04 '22 11:10

Theodor Zoulias