As Stephen Toub explained in this post, when you submit a message to an ActionBlock, you can ExecutionContext.Capture before calling ActionBlock.Post, pass a DTO holding both message and ExecutionContext into the block, then inside the message handling delegate use ExecutionContext.Run to run the delegate on the captured context:
public sealed class ContextFlowProcessor<T> {
private struct MessageState {
internal ExecutionContext Context;
internal T Value;
}
private readonly ITargetBlock<MessageState> m_block;
public ContextFlowProcessor(Action<T> action) {
m_block = new ActionBlock<MessageState>(ms =>
{
if (ms.Context != null)
using (ms.Context) ExecutionContext.Run(ms.Context, s => action((T)s), ms.Value);
else
action(ms.Value);
});
}
public bool Post(T item) {
var ec = ExecutionContext.Capture();
var rv = m_block.Post(new MessageState { Context = ec, Value = item });
if (!rv) ec.Dispose();
return rv;
}
public void Done() { m_block.DeclinePermanently(); }
public Task CompletionTask { get { return m_block.CompletionTask; } }
This works well when the logic inside the message handler is synchronous. But how can I run a piece of async logic on the captured ExecutionContext? I need something like this:
m_block = new ActionBlock<MessageState>(async ms =>
{
// omitting the null context situation for brevity
using (ms.Context)
{
await ExecutionContext.Run(ms.Context, async _ => { callSomethingAsync(ms.Value) });
}
});
Obviously, this doesn't compile because ExecutionContext.Run does not support asynchronous delegates (while ActionBlock does) - so how can I do this?
If you can provide a self-contained example so we could try to repro the problem, we might be able to provide a better answer. That said, it's possible to manually control the flow of ExecutionContext
(or rather, a copy of it) across await
continuations, using a simple custom synchronization context. Here is an example (warning - almost untested!):
// using EcFlowingSynchronizationContext:
m_block = new ActionBlock<MessageState>(async ms =>
{
using (ms.Context)
using (var sc = new EcFlowingSynchronizationContext(ms.Context))
{
await sc.Run(async _ => { await callSomethingAsync(ms.Value); });
}
});
// EcFlowingSynchronizationContext: flow execution context manually
public class EcFlowingSynchronizationContext : SynchronizationContext, IDisposable
{
private readonly ExecutionContext _ec;
private readonly TaskScheduler _taskScheduler;
public EcFlowingSynchronizationContext(ExecutionContext sourceEc)
{
TaskScheduler ts = null;
ExecutionContext ec = null;
ExecutionContext.Run(sourceEc, _ =>
{
var sc = SynchronizationContext.Current;
SynchronizationContext.SetSynchronizationContext(this);
try
{
ts = TaskScheduler.FromCurrentSynchronizationContext();
// this will also capture SynchronizationContext.Current,
// and it will be flown by subsequent ExecutionContext.Run
ec = ExecutionContext.Capture();
}
finally
{
SynchronizationContext.SetSynchronizationContext(sc);
}
}, null);
_ec = ec;
_taskScheduler = ts;
}
private void Execute(SendOrPostCallback d, object state)
{
using (var ec = _ec.CreateCopy())
{
ExecutionContext.Run(ec, new ContextCallback(d), state);
}
}
public Task Run(Func<Task> action, CancellationToken token = default(CancellationToken))
{
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, _taskScheduler).Unwrap();
}
public Task<TResult> Run<TResult>(Func<Task<TResult>> action, CancellationToken token = default(CancellationToken))
{
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, _taskScheduler).Unwrap();
}
public override void Post(SendOrPostCallback d, object state)
{
ThreadPool.UnsafeQueueUserWorkItem(s => Execute(d, s), state);
}
public override void Send(SendOrPostCallback d, object state)
{
Execute(d, state);
}
public override SynchronizationContext CreateCopy()
{
return this;
}
public void Dispose()
{
_ec.Dispose();
}
}
Note, you should only store immutable values using CallContext.LogicalSetData
(or AsyncLocal<T>
). I.e, if you need to store something that may change during an asynchronous flow from a callee to the caller, and be able to track that change in the caller, make it a property of a class and then store an instance of that class. Make sure that class is thread-safe as well, because eventually you can have many concurrent forks of the original execution contexts.
For more details, refer to Stephen Cleary's excellent Implicit Async Context ("AsyncLocal") and "Eliding Async and Await".
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With