Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to run an async delegate on captured ExecutionContext

As Stephen Toub explained in this post, when you submit a message to an ActionBlock, you can ExecutionContext.Capture before calling ActionBlock.Post, pass a DTO holding both message and ExecutionContext into the block, then inside the message handling delegate use ExecutionContext.Run to run the delegate on the captured context:

public sealed class ContextFlowProcessor<T> {
    private struct MessageState {
        internal ExecutionContext Context;
        internal T Value;
    }

    private readonly ITargetBlock<MessageState> m_block;

    public ContextFlowProcessor(Action<T> action) {
        m_block = new ActionBlock<MessageState>(ms =>
        {
            if (ms.Context != null)
                using (ms.Context) ExecutionContext.Run(ms.Context, s => action((T)s), ms.Value);
            else 
                action(ms.Value);
        });
    }

    public bool Post(T item) {
        var ec = ExecutionContext.Capture();
        var rv = m_block.Post(new MessageState { Context = ec, Value = item });
        if (!rv) ec.Dispose();
        return rv;
    }

    public void Done() { m_block.DeclinePermanently(); }

    public Task CompletionTask { get { return m_block.CompletionTask; } }

This works well when the logic inside the message handler is synchronous. But how can I run a piece of async logic on the captured ExecutionContext? I need something like this:

m_block = new ActionBlock<MessageState>(async ms =>
{
      // omitting the null context situation for brevity
      using (ms.Context)
      {
         await ExecutionContext.Run(ms.Context, async _ => { callSomethingAsync(ms.Value) });
      }
});

Obviously, this doesn't compile because ExecutionContext.Run does not support asynchronous delegates (while ActionBlock does) - so how can I do this?

like image 728
Stop Putin Stop War Avatar asked Aug 14 '19 19:08

Stop Putin Stop War


Video Answer


1 Answers

If you can provide a self-contained example so we could try to repro the problem, we might be able to provide a better answer. That said, it's possible to manually control the flow of ExecutionContext (or rather, a copy of it) across await continuations, using a simple custom synchronization context. Here is an example (warning - almost untested!):

// using EcFlowingSynchronizationContext:

m_block = new ActionBlock<MessageState>(async ms =>
{
      using (ms.Context)
      using (var sc = new EcFlowingSynchronizationContext(ms.Context))
      {
         await sc.Run(async _ => { await callSomethingAsync(ms.Value); });
      }
});

// EcFlowingSynchronizationContext: flow execution context manually 

public class EcFlowingSynchronizationContext : SynchronizationContext, IDisposable
{
    private readonly ExecutionContext _ec;
    private readonly TaskScheduler _taskScheduler;

    public EcFlowingSynchronizationContext(ExecutionContext sourceEc) 
    {
        TaskScheduler ts = null;
        ExecutionContext ec = null;

        ExecutionContext.Run(sourceEc, _ =>
        {
            var sc = SynchronizationContext.Current;
            SynchronizationContext.SetSynchronizationContext(this);
            try
            {
                ts = TaskScheduler.FromCurrentSynchronizationContext();
                // this will also capture SynchronizationContext.Current,
                // and it will be flown by subsequent ExecutionContext.Run
                ec = ExecutionContext.Capture();
            }
            finally
            {
                SynchronizationContext.SetSynchronizationContext(sc);
            }
        }, null);

        _ec = ec;
        _taskScheduler = ts;
    }

    private void Execute(SendOrPostCallback d, object state)
    {
        using (var ec = _ec.CreateCopy())
        {
            ExecutionContext.Run(ec, new ContextCallback(d), state);
        }
    }

    public Task Run(Func<Task> action, CancellationToken token = default(CancellationToken))
    {
        return Task.Factory.StartNew(action, token, TaskCreationOptions.None, _taskScheduler).Unwrap();
    }

    public Task<TResult> Run<TResult>(Func<Task<TResult>> action, CancellationToken token = default(CancellationToken))
    {
        return Task.Factory.StartNew(action, token, TaskCreationOptions.None, _taskScheduler).Unwrap();
    }

    public override void Post(SendOrPostCallback d, object state)
    {
        ThreadPool.UnsafeQueueUserWorkItem(s => Execute(d, s), state);
    }

    public override void Send(SendOrPostCallback d, object state)
    {
        Execute(d, state);
    }

    public override SynchronizationContext CreateCopy()
    {
        return this;
    }

    public void Dispose()
    {
        _ec.Dispose();
    }
}

Note, you should only store immutable values using CallContext.LogicalSetData (or AsyncLocal<T>). I.e, if you need to store something that may change during an asynchronous flow from a callee to the caller, and be able to track that change in the caller, make it a property of a class and then store an instance of that class. Make sure that class is thread-safe as well, because eventually you can have many concurrent forks of the original execution contexts.

For more details, refer to Stephen Cleary's excellent Implicit Async Context ("AsyncLocal") and "Eliding Async and Await".

like image 114
noseratio Avatar answered Oct 06 '22 14:10

noseratio