Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create a new context per every async operation and use it in a thread-safe manner for storing some data

I'm working on something like a context-bound caching and a little bit stuck on thread-safety...

Let's say I have the following code:

public class AsynLocalContextualCacheAccessor : IContextualCacheAccessor
{
    private static readonly AsyncLocal<CacheScopesManager> _rCacheContextManager = new AsyncLocal<CacheScopesManager>();

    public AsynLocalContextualCacheAccessor()
    {
    }

    public CacheScope Current
    { 
        get
        {
            if (_rCacheContextManager.Value == null)
                _rCacheContextManager.Value = new CacheScopesManager();

            return _rCacheContextManager.Value.Current;
        }
    } 
}

public class CacheScopesManager
{
    private static readonly AsyncLocal<ImmutableStack<CacheScope>> _scopesStack = new AsyncLocal<ImmutableStack<CacheScope>>(OnValueChanged);

    public CacheScopesManager()
    {
        CacheScope contextualCache = _NewScope();

        _scopesStack.Value = ImmutableStack.Create<CacheScope>();
        _scopesStack.Value = _scopesStack.Value.Push(contextualCache);
    }

    public CacheScope Current
    {
        get
        {
            if (_scopesStack.Value.IsEmpty)
                return null;

            CacheScope current = _scopesStack.Value.Peek();
            if (current.IsDisposed)
            {
                _scopesStack.Value = _scopesStack.Value.Pop();
                return Current;
            }

            // Create a new scope if we entered the new physical thread in the same logical thread
            // in order to update async local stack and automatically have a new scope per every logically new operation
            int currentThreadId = Thread.CurrentThread.ManagedThreadId;
            if (currentThreadId != current.AcquiredByThread)
            {
                current = _NewScope();
                _scopesStack.Value = _scopesStack.Value.Push(current);
            }

            return current;
        }
    }

    private static void OnValueChanged(AsyncLocalValueChangedArgs<ImmutableStack<CacheScope>> args)
    {
        // Manual is not interesting to us.
        if (!args.ThreadContextChanged)
            return;

        ImmutableStack<CacheScope> currentStack = args.CurrentValue;
        ImmutableStack<CacheScope> previousStack = args.PreviousValue;

        int threadId = Thread.CurrentThread.ManagedThreadId; 

        int threadIdCurrent = args.CurrentValue?.Peek().AcquiredByThread ?? -1;
        int threadIdPrevious = args.PreviousValue?.Peek().AcquiredByThread ?? -1;

        // Be sure in disposing of the scope
        // This situation means a comeback of the previous execution context, in case if in the previous scope Current was used.
        if (currentStack != null && previousStack != null
            && currentStack.Count() > previousStack.Count())
            currentStack.Peek().Dispose();
    }
}

And I'm trying to satisfy the next test:

    [TestMethod]
    [TestCategory(TestCategoryCatalogs.UnitTest)]
    public async Task AsyncLocalCacheManagerAccessor_request_that_processed_by_more_than_by_one_thread_is_threadsafe()
    {
        IContextualCacheAccessor asyncLocalAccessor = new AsynLocalContextualCacheAccessor();
        Task requestAsyncFlow = Task.Run(async () =>
        {
            string key1 = "key1";
            string value1 = "value1";

            string key2 = "key2";
            string value2 = "value2";

            CacheScope scope1 = asyncLocalAccessor.Current;

            string initialKey = "k";
            object initialVal = new object();

            scope1.Put(initialKey, initialVal);
            scope1.TryGet(initialKey, out object result1).Should().BeTrue();
            result1.Should().Be(initialVal);

            var parallel1 = Task.Run(async () =>
            {
                await Task.Delay(5);
                var cache = asyncLocalAccessor.Current;
                cache.TryGet(initialKey, out object result2).Should().BeTrue();
                result2.Should().Be(initialVal);

                cache.Put(key1, value1);
                await Task.Delay(10);

                cache.Items.Count.Should().Be(1);
                cache.TryGet(key1, out string result11).Should().BeTrue();
                result11.Should().Be(value1);
            });

            var parallel2 = Task.Run(async () =>
            {
                await Task.Delay(2);
                var cache = asyncLocalAccessor.Current;

                cache.StartScope();

                cache.TryGet(initialKey, out object result3).Should().BeTrue();
                result3.Should().Be(initialVal);

                cache.Put(key2, value2);
                await Task.Delay(15);

                cache.Items.Count.Should().Be(1);
                cache.TryGet(key2, out string result21).Should().BeTrue();
                result21.Should().Be(value2);
            });

            await Task.WhenAll(parallel1, parallel2);

            // Here is an implicit dependency from Synchronization Context, and in most cases
            // the next code will be handled by a new thread, that will cause a creation of a new scope,
            // as well as for any other await inside any async operation,  which is quite bad:( 

            asyncLocalAccessor.Current.Items.Count.Should().Be(1);
            asyncLocalAccessor.Current.TryGet(initialKey, out object result4).Should().BeTrue();
            result4.Should().Be(initialVal);
        });

        await requestAsyncFlow;

        asyncLocalAccessor.Current.Items.Count.Should().Be(0);
    }

And actually this test is green, but there is one (or more) problem. So, what I'm trying to achieve, is to create a stack of scopes per every new async operation (if the current scope was accessed) and when this operation is finished I need to successfully come back to the previous stack. I have done this based on a current thread ID (because I didn't find any other way how to do that automatically, but I don't like my solution in any way), but if continuation of async operation was executed not in the initial thread (implicit dependency from current SynchronizationContext), but in any other, then this causes the creation of a new scope, which is very bad, as for me.

I would be glad if someone could suggest how to do that correctly, big thanks! :)

UPD 1. Code updated in order to add static for every AsyncLocal field since the value of every AsyncLocal is acquired from ExecutionContext.GetLocalValue() which is static, so non-static AsyncLocal just a redundant memory pressure.

UPD 2. Thanks, @weichch for the answer, since comment could be big, I just added additional info directly to the question. So, in my case logic with AsyncLocal stuff encapsulated, and what client of my code can do - it only invokes Current on IContextualCacheAccessor, which will get the instance of an object under AsyncLocal<CacheScopesManager>, AsyncLocal is used here just to have one instance of CacheScopesManager per logical request and share it across this request, similar to IoC-Container scoped lifecycle, but the lifecycle of such object is defined from the creation of the object until the end of the async flow where this object was created. Or let's think about ASP NET Core where we have IHttpContext, IHttpContext does not seem to be immutable, but is still used as AsyncLocal through IHttpContextAccessor, isn't it? Similar to this way CacheScopesManager was designed.

So, if client code, to get current CacheScope, can only invoke Current on IContextualCacheAccessor, then in case of AsyncLocal implementation of IContextualCacheAccessor call stack will fall into next code:

public CacheScope Current
{
    get
    {
        if (_scopesStack.Value.IsEmpty)
            return null;

        CacheScope current = _scopesStack.Value.Peek();
        if (current.IsDisposed)
        {
            _scopesStack.Value = _scopesStack.Value.Pop();
            return Current;
        }

        // Create a new scope if we entered the new physical thread in the same logical thread
        // in order to update async local stack and automatically have a new scope per every logically new operation
        int currentThreadId = Thread.CurrentThread.ManagedThreadId;
        if (currentThreadId != current.AcquiredByThread)
        {
            current = _NewScope();
            _scopesStack.Value = _scopesStack.Value.Push(current);
        }

        return current;
    }
}

and if another thread decided to use Current, this will cause the creation of new scope, and since ImmutableStack<CacheScope> is 'AsyncLocal', we are saving the stack of the previous async flow from any changes, which means that when we return to it, the stack will be fine without any corruptions (for sure, if hacks were not used). All this was done to make the stack of scopes threadsafe, rather than a real 'AsyncLocal'. So, your code

async Task Method1()
{
    Cache.Push(new CacheScope { Value = "Method1" });

    await Task.WhenAll(Method2(), Method3());

    Cache.Pop();
}

async Task Method2()
{
    await Task.Delay(10);

    var scope = Cache.CurrentStack.Peek();
    scope.Value = "Method2";

    Console.WriteLine($"Method2 - {scope.Value}");
}

async Task Method3()
{
    await Task.Delay(10);

    var scope = Cache.CurrentStack.Peek();

    Console.WriteLine($"Method3 - {scope.Value}");
}

in case if my accessor is used, will not cause mutation in the async flow which will be reflected in another one (and adding data to the scope of the previous async flow, before thread switching - is fine for me). But there is one problem, actually, the purpose of this CacheScope is to have some storage that goes across the logical request and caches some data, and this data is scoped to CacheScope and will be popped up from the referencable memory, as soon as the scope will end. And I want to minimize the creation of such scopes, which means that if code was executed sequentially, there should not be any reasons to create a new scope, even if continuation of some async operation has happened on another thread because logically code is still 'sequential' and it's ok to share the same scope per such 'sequential' code. Please, correct me if I wrong somewhere.

But your answer and explanation are really useful and for sure will protect others from making mistakes. Also, it helped me to understand what Stephen meant under:

If you do go down this route, I recommend writing lots and lots of unit tests.

my English is poor and I thought that 'route' means 'link to the article', now understand that it's is rather 'way' in that context.

UPD 3. Added some code of CacheScope for a better picture.

public class CacheScope : IDisposableExtended
{
    private ICacheScopesManager _scopeManager;
    private CacheScope _parentScope;

    private Dictionary<string, object> _storage = new Dictionary<string, object>();


    internal CacheScope(Guid id, int boundThreadId, ICacheScopesManager scopeManager, 
        CacheScope parentScope)
    {
        _scopeManager = scopeManager.ThrowIfArgumentIsNull(nameof(scopeManager));

        Id = id;
        AcquiredByThread = boundThreadId;

        _parentScope = parentScope;
    }

    public Guid Id { get; }

    public int AcquiredByThread { get; }

    public IReadOnlyCollection<object> Items => _storage?.Values;

    public bool IsDisposed { get; private set; } = false;

    public bool TryExpire<TItem>(string key, out TItem expiredItem)
    {
        _AssertInstanceIsDisposed();

        key.ThrowIfArgumentIsNull(nameof(key));

        expiredItem = default(TItem);

        try
        {
            expiredItem = (TItem)_storage[key];
        }
        catch (KeyNotFoundException)
        {
            // Even if item is present in parent scope it cannot be expired from inner scope.
            return false;
        }

        _storage.Remove(key);

        return true;
    }


    public TItem GetOrPut<TItem>(string key, Func<string, TItem> putFactory)
    {
        _AssertInstanceIsDisposed();

        key.ThrowIfArgumentIsNull(nameof(key));
        putFactory.ThrowIfArgumentIsNull(nameof(putFactory));

        TItem result;

        try
        {
            result = (TItem)_storage[key];
        }
        catch (KeyNotFoundException)
        {
            if (_parentScope != null && _parentScope.TryGet(key, out result))
                return result;

            result = putFactory(key);

            _storage.Add(key, result);
        }

        return result;
    }

    public void Put<TItem>(string key, TItem item)
    {
        _AssertInstanceIsDisposed();

        key.ThrowIfArgumentIsNull(nameof(key));

        _storage[key] = item;

        // We are not even thinking about to change the parent scope here,
        // because parent scope should be considered by current as immutable.
    }

    public bool TryGet<TItem>(string key, out TItem item)
    {
        _AssertInstanceIsDisposed();

        key.ThrowIfArgumentIsNull(nameof(key));

        item = default(TItem);

        try
        {
            item = (TItem)_storage[key];
        }
        catch (KeyNotFoundException)
        {
            return _parentScope != null && _parentScope.TryGet(key, out item);
        }

        return true;
    }

    public void Dispose()
    {
        if (IsDisposed)
            return;

        Dictionary<string, object> localStorage = Interlocked.Exchange(ref _storage, null);
        if (localStorage == null)
        {
            // that should never happen but Dispose in general is expected to be safe to call so... let's obey the rules
            return;
        }

        foreach (var item in localStorage.Values)
            if (item is IDisposable disposable)
                disposable.Dispose();

        _parentScope = null;
        _scopeManager = null;

        IsDisposed = true;
    }

    public CacheScope StartScope() => _scopeManager.CreateScope(this);
}
like image 553
Alexander Tolstikov Avatar asked Jan 26 '23 02:01

Alexander Tolstikov


1 Answers

Your code is really fighting the way AsyncLocal<T> works. Setting in a getter, trying to manage the scopes manually, having an async local manager for an async local type, and the code using the change handler are all problematic.

I believe all this is really to try to deal with the fact that CacheScope isn't immutable. The best way to solve this is to make CacheScope a proper immutable object. Then everything else will fall into place more or less naturally.

I find it's often easier to write a separate static API for immutable objects which is more "async local-friendly". E.g.:

public class ImplicitCache
{
  private static readonly AsyncLocal<ImmutableStack<(string, object)>> _asyncLocal = new AsyncLocal<ImmutableStack<(string, object)>>();

  private static ImmutableStack<(string, object)> CurrentStack
  {
    get => _asyncLocal.Current ?? ImmutableStack.Create<ImmutableDictionary<string, object>>();
    set => _asyncLocal.Current = value.IsEmpty ? null : value;
  }

  // Separate API:

  public static IDisposable Put(string key, object value)
  {
    if (key == null)
      throw new InvalidOperationException();
    CurrentStack = CurrentStack.Push((key, value));
    return new Disposable(() => CurrentStack = CurrentStack.Pop());
  }

  public static bool TryGet(string key, out object value)
  {
    var result = CurrentStack.Reverse().FirstOrDefault(x => x.Item1 == key);
    value = result.Item2;
    return result.Item1 != null;
  }
}

Usage:

public async Task AsyncLocalCacheManagerAccessor_request_that_processed_by_more_than_by_one_thread_is_threadsafe()
{
  Task requestAsyncFlow = Task.Run(async () =>
  {
    string key1 = "key1";
    string value1 = "value1";

    string key2 = "key2";
    string value2 = "value2";

    string initialKey = "k";
    object initialVal = new object();

    using var dispose1 = ImplicitCache.Put(initialKey, initialVal);
    ImplicitCache.TryGet(initialKey, out object result1).Should().BeTrue();
    result1.Should().Be(initialVal);

    var parallel1 = Task.Run(async () =>
    {
      await Task.Delay(5);
      ImplicitCache.TryGet(initialKey, out object result2).Should().BeTrue();
      result2.Should().Be(initialVal);

      using var dispose2 = ImplicitCache.Put(key1, value1);
      await Task.Delay(10);

      ImplicitCache.TryGet(key1, out string result11).Should().BeTrue();
      result11.Should().Be(value1);
    });

    var parallel2 = Task.Run(async () =>
    {
      await Task.Delay(2);

      ImplicitCache.TryGet(initialKey, out object result3).Should().BeTrue();
      result3.Should().Be(initialVal);

      using var disose3 = ImplicitCache.Put(key2, value2);
      await Task.Delay(15);

      ImplicitCache.TryGet(key2, out string result21).Should().BeTrue();
      result21.Should().Be(value2);
    });

    await Task.WhenAll(parallel1, parallel2);

    ImplicitCache.TryGet(initialKey, out object result4).Should().BeTrue();
    result4.Should().Be(initialVal);
  });

  await requestAsyncFlow;

  ImplicitCache.TryGet(initialKey, out _).Should().BeFalse();
}
like image 86
Stephen Cleary Avatar answered Jan 30 '23 00:01

Stephen Cleary