I've got the following scenario, which I think might be quite common:
There is a task (a UI command handler) which can complete either synchronously or asynchronously.
Commands may arrive faster than they are getting processed.
If there is already a pending task for a command, the new command handler task should be queued and processed sequentially.
Each new task's result may depend on the result of the previous task.
Cancellation should be observed, but I'd like to leave it outside the scope of this question for simplicity. Also, thread-safety (concurrency) is not a requirement, but re-entrancy must be supported.
Here's a basic example of what I'm trying to achieve (as a console app, for simplicity):
using System;
using System.Threading.Tasks;
namespace ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var asyncOp = new AsyncOp<int>();
Func<int, Task<int>> handleAsync = async (arg) =>
{
Console.WriteLine("this task arg: " + arg);
//await Task.Delay(arg); // make it async
return await Task.FromResult(arg); // sync
};
Console.WriteLine("Test #1...");
asyncOp.RunAsync(() => handleAsync(1000));
asyncOp.RunAsync(() => handleAsync(900));
asyncOp.RunAsync(() => handleAsync(800));
asyncOp.CurrentTask.Wait();
Console.WriteLine("\nPress any key to continue to test #2...");
Console.ReadLine();
asyncOp.RunAsync(() =>
{
asyncOp.RunAsync(() => handleAsync(200));
return handleAsync(100);
});
asyncOp.CurrentTask.Wait();
Console.WriteLine("\nPress any key to exit...");
Console.ReadLine();
}
// AsyncOp
class AsyncOp<T>
{
Task<T> _pending = Task.FromResult(default(T));
public Task<T> CurrentTask { get { return _pending; } }
public Task<T> RunAsync(Func<Task<T>> handler)
{
var pending = _pending;
Func<Task<T>> wrapper = async () =>
{
// await the prev task
var prevResult = await pending;
Console.WriteLine("\nprev task result: " + prevResult);
// start and await the handler
return await handler();
};
_pending = wrapper();
return _pending;
}
}
}
}
The output:
Test #1... prev task result: 0 this task arg: 1000 prev task result: 1000 this task arg: 900 prev task result: 900 this task arg: 800 Press any key to continue to test #2... prev task result: 800 prev task result: 800 this task arg: 200 this task arg: 100 Press any key to exit...
It works in accordance with the requirements, until re-entrancy is introduced in test #2:
asyncOp.RunAsync(() =>
{
asyncOp.RunAsync(() => handleAsync(200));
return handleAsync(100);
});
The desired output should be 100
, 200
, rather than 200
, 100
, because there's already a pending outer task for 100
. That's obviously because the inner task executes synchronously, breaking the logic var pending = _pending; /* ... */ _pending = wrapper()
for the outer task.
How to make it work for test #2, too?
One solution would be to enforce asynchrony for every task, with Task.Factory.StartNew(..., TaskScheduler.FromCurrentSynchronizationContext()
. However, I don't want to impose asynchronous execution upon the command handlers which might be synchronous internally. Also, I don't want to depend on the behavior of any particular synchronization context (i.e. relying upon that Task.Factory.StartNew
should return before the created task has been actually started).
In the real-life project, I'm responsible for what AsyncOp
is above, but have no control over the command handlers (i.e., whatever is inside handleAsync
).
I almost forgot it's possible to construct a Task
manually, without starting or scheduling it. Then, "Task.Factory.StartNew" vs "new Task(...).Start" put me back on track. I think this is one of those few cases when the Task<TResult>
constructor may actually be useful, along with nested tasks (Task<Task<T>>
) and Task.Unwrap()
:
// AsyncOp
class AsyncOp<T>
{
Task<T> _pending = Task.FromResult(default(T));
public Task<T> CurrentTask { get { return _pending; } }
public Task<T> RunAsync(Func<Task<T>> handler, bool useSynchronizationContext = false)
{
var pending = _pending;
Func<Task<T>> wrapper = async () =>
{
// await the prev task
var prevResult = await pending;
Console.WriteLine("\nprev task result: " + prevResult);
// start and await the handler
return await handler();
};
var task = new Task<Task<T>>(wrapper);
var inner = task.Unwrap();
_pending = inner;
task.RunSynchronously(useSynchronizationContext ?
TaskScheduler.FromCurrentSynchronizationContext() :
TaskScheduler.Current);
return inner;
}
}
The output:
Test #1... prev task result: 0 this task arg: 1000 prev task result: 1000 this task arg: 900 prev task result: 900 this task arg: 800 Press any key to continue to test #2... prev task result: 800 this task arg: 100 prev task result: 100 this task arg: 200
It's now also very easy to make AsyncOp
thread-safe by adding a lock
to protect _pending
, if needed.
Updated, this has been further improved with cancel/restart logic.
Here is a solution that is worse on every aspect compared to the accepted answer, except from being thread-safe (which is not a requirement of the question). Disadvantages:
executeOnCurrentContext
configuration effects all lambdas (it's not a per-lambda configuration).This solution uses as processing engine an ActionBlock
from the TPL Dataflow library.
public class AsyncOp<T>
{
private readonly ActionBlock<Task<Task<T>>> _actionBlock;
public AsyncOp(bool executeOnCurrentContext = false)
{
var options = new ExecutionDataflowBlockOptions();
if (executeOnCurrentContext)
options.TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext();
_actionBlock = new ActionBlock<Task<Task<T>>>(async taskTask =>
{
try
{
taskTask.RunSynchronously();
await await taskTask;
}
catch { } // Ignore exceptions
}, options);
}
public Task<T> RunAsync(Func<Task<T>> taskFactory)
{
var taskTask = new Task<Task<T>>(taskFactory);
if (!_actionBlock.Post(taskTask))
throw new InvalidOperationException("Not accepted"); // Should never happen
return taskTask.Unwrap();
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With