Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Async version of Monitor.Pulse/Wait

I'm trying to optimize an async version of something similar (in basic funcionality) to the Monitor.Wait and Monitor.Pulse methods. The idea is to use this over an async method.

Requirements: 1) I have one Task running, that it is in charge of waiting until someone pulses my monitor. 2) That task may compute a complex (ie: time consuming) operation. In the meanwhile, the pulse method could be called several times without doing anything (as the main task is already doing some processing). 3) Once the main task finishes, it starts to Wait again until another Pulse comes in.

Worst case scenario is Wait>Pulse>Wait>Pulse>Wait..., but usually I have tenths/hundreds of pulses for every wait.

So, I have the following class (working, but I think it can be optimized a bit based on my requirements)

internal sealed class Awaiter
{
    private readonly ConcurrentQueue<TaskCompletionSource<byte>> _waiting = new ConcurrentQueue<TaskCompletionSource<byte>>();

    public void Pulse()
    {
        TaskCompletionSource<byte> tcs;
        if (_waiting.TryDequeue(out tcs))
        {
            tcs.TrySetResult(1);
        }
    }

    public Task Wait()
    {
        TaskCompletionSource<byte> tcs;
        if (_waiting.TryPeek(out tcs))
        {
            return tcs.Task;
        }

        tcs = new TaskCompletionSource<byte>();
        _waiting.Enqueue(tcs);
        return tcs.Task;
    }
}

The problem with the above class is the baggage I'm using just for synchronization. Since I will be waiting from one and only one thread, there is really no need to have a ConcurrentQueue, as I always have only one item in it.

So, I simplified it a bit and wrote the following:

internal sealed class Awaiter2
{
    private readonly object _mutex = new object();
    private TaskCompletionSource<byte> _waiting;

    public void Pulse()
    {
        var w = _waiting;
        if (w == null)
        {
            return;
        }

        lock (_mutex)
        {
            w = _waiting;
            if (w == null)
            {
                return;
            }

            _waiting = null;
            w.TrySetResult(1);
        }
    }

    public Task Wait()
    {
        var w = _waiting;
        if (w != null)
        {
            return w.Task;
        }

        lock (_mutex)
        {
            w = _waiting;
            if (w != null)
            {
                return w.Task;
            }

            w = _waiting = new TaskCompletionSource<byte>();
            return w.Task;
        }
    }
}

That new version is also working ok, but I'm still thinking it can be optimized a bit more, by removing the locks.

I'm looking for suggestions on how I can optimize the second version. Any ideas?

like image 363
CheloXL Avatar asked Jan 14 '16 15:01

CheloXL


People also ask

What does Monitor Wait do?

Wait(Object, TimeSpan, Boolean) Releases the lock on an object and blocks the current thread until it reacquires the lock. If the specified time-out interval elapses, the thread enters the ready queue.

What does the C# method monitor PulseAll () do?

Enter method does not protect variable x at all. What does the C# method Monitor. PulseAll() do? Move all threads in waiting state into ready state.

What is monitor pulse?

A heart rate monitor (HRM) is a personal monitoring device that allows one to measure/display heart rate in real time or record the heart rate for later study. It is largely used to gather heart rate data while performing various types of physical exercise.


3 Answers

If you don't need the Wait() call to return a Task but are content with being able to await Wait() then you can implement a custom awaiter/awaitable.

See this link for an overview of the await pattern used by the compiler.

When implementing custom awaitables you will just be dealing with delegates and the actual "waiting" is left up to you. When you want to "await" for a condition it is often possible to keep a list of pending continuations and whenever the condition comes true you can invoke those continuations. You just need to deal with the synchronization coming from the fact that await can be called from arbitrary threads. If you know that you'll only ever await from one thread (say the UI thread) then you don't need any synchronization at all!

I'll try to give you a lock-free implementation but no guarantees that it is correct. If you don't understand why all race conditions are safe you should not use it and implement the async/await protocol using lock-statements or other techniques which you know how to debug.

public sealed class AsyncMonitor
{
    private PulseAwaitable _currentWaiter;

    public AsyncMonitor()
    {
        _currentWaiter = new PulseAwaitable();
    }

    public void Pulse()
    {
        // Optimize for the case when calling Pulse() when nobody is waiting.
        //
        // This has an inherent race condition when calling Pulse() and Wait()
        // at the same time. The question this was written for did not specify
        // how to resolve this, so it is a valid answer to tolerate either
        // result and just allow the race condition.
        //
        if (_currentWaiter.HasWaitingContinuations)
            Interlocked.Exchange(ref _currentWaiter, new PulseAwaitable()).Complete();
    }

    public PulseAwaitable Wait()
    {
        return _currentWaiter;
    }
}

// This class maintains a list of waiting continuations to be executed when
// the owning AsyncMonitor is pulsed.
public sealed class PulseAwaitable : INotifyCompletion
{
    // List of pending 'await' delegates.
    private Action _pendingContinuations;

    // Flag whether we have been pulsed. This is the primary variable
    // around which we build the lock free synchronization.
    private int _pulsed;

    // AsyncMonitor creates instances as required.
    internal PulseAwaitable()
    {
    }

    // This check has a race condition which is tolerated.
    // It is used to optimize for cases when the PulseAwaitable has no waiters.
    internal bool HasWaitingContinuations
    {
        get { return Volatile.Read(ref _pendingContinuations) != null; }
    }

    // Called by the AsyncMonitor when it is pulsed.
    internal void Complete()
    {
        // Set pulsed flag first because that is the variable around which
        // we build the lock free protocol. Everything else this method does
        // is free to have race conditions.
        Interlocked.Exchange(ref _pulsed, 1);

        // Execute pending continuations. This is free to race with calls
        // of OnCompleted seeing the pulsed flag first.
        Interlocked.Exchange(ref _pendingContinuations, null)?.Invoke();
    }

    #region Awaitable

    // There is no need to separate the awaiter from the awaitable
    // so we use one class to implement both parts of the protocol.
    public PulseAwaitable GetAwaiter()
    {
        return this;
    }

    #endregion

    #region Awaiter

    public bool IsCompleted
    {
        // The return value of this property does not need to be up to date so we could omit the 'Volatile.Read' if we wanted to.
        // What is not allowed is returning "true" even if we are not completed, but this cannot happen since we never transist back to incompleted.
        get { return Volatile.Read(ref _pulsed) == 1; }
    }

    public void OnCompleted(Action continuation)
    {
        // Protected against manual invocations. The compiler-generated code never passes null so you can remove this check in release builds if you want to.
        if (continuation == null)
            throw new ArgumentNullException(nameof(continuation));

        // Standard pattern of maintaining a lock free immutable variable: read-modify-write cycle.
        // See for example here: https://blogs.msdn.microsoft.com/oldnewthing/20140516-00/?p=973
        // Again the 'Volatile.Read' is not really needed since outdated values will be detected at the first iteration.
        var oldContinuations = Volatile.Read(ref _pendingContinuations);
        for (;;)
        {
            var newContinuations = (oldContinuations + continuation);
            var actualContinuations = Interlocked.CompareExchange(ref _pendingContinuations, newContinuations, oldContinuations);
            if (actualContinuations == oldContinuations)
                break;

            oldContinuations = actualContinuations;
        }

        // Now comes the interesting part where the actual lock free synchronization happens.
        // If we are completed then somebody needs to clean up remaining continuations.
        // This happens last so the first part of the method can race with pulsing us.
        if (IsCompleted)
            Interlocked.Exchange(ref _pendingContinuations, null)?.Invoke();
    }

    public void GetResult()
    {
        // This is just to check against manual calls. The compiler will never call this when IsCompleted is false.
        // (Assuming your OnCompleted implementation is bug-free and you don't execute continuations before IsCompleted becomes true.)
        if (!IsCompleted)
            throw new NotSupportedException("Synchronous waits are not supported. Use 'await' or OnCompleted to wait asynchronously");
    }

    #endregion
}

You usually don't bother on which thread the continuations run because if they are async methods the compiler has already inserted code (in the continuation) to switch back to the right thread, no need to do it manually in every awaitable implementation.

[edit]

As a starting point for how a locking implementation can look I'll provide one using a lock-statement. It should be easy to replace it by a spinlock or some other locking technique. By using a struct as the awaitable it even has the advantage that it does no additional allocation except for the initial object. (There are of course allocations in the async/await framework in the compiler magic on the calling side, but you can't get rid of these.)

Note that the iteration counter will increment only for every Wait+Pulse pair and will eventually overflow into negative, but that is ok. We just need to bridge the time from the continuation beeing invoked until it can call GetResult. 4 billion Wait+Pulse pairs should be plenty of time for any pending continuations to call its GetResult method. If you don't want that risk you could use a long or Guid for a more unique iteration counter, but IMHO an int is good for almost all scenarios.

public sealed class AsyncMonitor
{
    public struct Awaitable : INotifyCompletion
    {
        // We use a struct to avoid allocations. Note that this means the compiler will copy
        // the struct around in the calling code when doing 'await', so for your own debugging
        // sanity make all variables readonly.
        private readonly AsyncMonitor _monitor;
        private readonly int _iteration;

        public Awaitable(AsyncMonitor monitor)
        {
            lock (monitor)
            {
                _monitor = monitor;
                _iteration = monitor._iteration;
            }
        }

        public Awaitable GetAwaiter()
        {
            return this;
        }

        public bool IsCompleted
        {
            get
            {
                // We use the iteration counter as an indicator when we should be complete.
                lock (_monitor)
                {
                    return _monitor._iteration != _iteration;
                }
            }
        }

        public void OnCompleted(Action continuation)
        {
            // The compiler never passes null, but someone may call it manually.
            if (continuation == null)
                throw new ArgumentNullException(nameof(continuation));

            lock (_monitor)
            {
                // Not calling IsCompleted since we already have a lock.
                if (_monitor._iteration == _iteration)
                {
                    _monitor._waiting += continuation;

                    // null the continuation to indicate the following code
                    // that we completed and don't want it executed.
                    continuation = null;
                }
            }

            // If we were already completed then we didn't null the continuation.
            // (We should invoke the continuation outside of the lock because it
            // may want to Wait/Pulse again and we want to avoid reentrancy issues.)
            continuation?.Invoke();
        }

        public void GetResult()
        {
            lock (_monitor)
            {
                // Not calling IsCompleted since we already have a lock.
                if (_monitor._iteration == _iteration)
                    throw new NotSupportedException("Synchronous wait is not supported. Use await or OnCompleted.");
            }
        }
    }

    private Action _waiting;
    private int _iteration;

    public AsyncMonitor()
    {
    }

    public void Pulse(bool executeAsync)
    {
        Action execute = null;

        lock (this)
        {
            // If nobody is waiting we don't need to increment the iteration counter.
            if (_waiting != null)
            {
                _iteration++;
                execute = _waiting;
                _waiting = null;
            }
        }

        // Important: execute the callbacks outside the lock because they might Pulse or Wait again.
        if (execute != null)
        {
            // If the caller doesn't want inlined execution (maybe he holds a lock)
            // then execute it on the thread pool.
            if (executeAsync)
                Task.Run(execute);
            else
                execute();
        }
    }

    public Awaitable Wait()
    {
        return new Awaitable(this);
    }
}
like image 173
Zarat Avatar answered Oct 05 '22 14:10

Zarat


Here is my simple async implementation that I use in my projects:

internal sealed class Pulsar
{
    private static TaskCompletionSource<bool> Init() => new TaskCompletionSource<bool>();

    private TaskCompletionSource<bool> _tcs = Init();

    public void Pulse()
    {
        Interlocked.Exchange(ref _tcs, Init()).SetResult(true);
    }

    public Task AwaitPulse(CancellationToken token)
    {
        return token.CanBeCanceled ? _tcs.Task.WithCancellation(token) : _tcs.Task;
    }
}

Add TaskCreationOptions.RunContinuationsAsynchronously to the TCS for async continuations.

The WithCancellation can be omitted of course, if you do not need cancellations.

like image 41
AgentFire Avatar answered Oct 05 '22 15:10

AgentFire


Because you only have one task ever waiting your function can be simplified to

internal sealed class Awaiter3
{
    private volatile TaskCompletionSource<byte> _waiting;

    public void Pulse()
    {
        var w = _waiting;
        if (w == null)
        {
            return;
        }
        _waiting = null;
#if NET_46_OR_GREATER
        w.TrySetResult(1);
#else
        Task.Run(() => w.TrySetResult(1));
#endif

    }

    //This method is not thread safe and can only be called by one thread at a time.
    // To make it thread safe put a lock around the null check and the assignment,
    // you do not need to have a lock on Pulse, "volatile" takes care of that side.
    public Task Wait()
    {
        if(_waiting != null)
            throw new InvalidOperationException("Only one waiter is allowed to exist at a time!");

#if NET_46_OR_GREATER
        _waiting = new TaskCompletionSource<byte>(TaskCreationOptions.RunContinuationsAsynchronously);
#else
        _waiting = new TaskCompletionSource<byte>();
#endif
        return _waiting.Task;
    }
}

One behavior I did change. If you are using .NET 4.6 or newer use the code in the #if NET_46_OR_GREATER blocks, if under use the else blocks. When you call TrySetResult you could have the continuation synchronously run, this can cause Pulse() to take a long time to complete. By using TaskCreationOptions.RunContinuationsAsynchronously in .NET 4.6 or wrapping the TrySetResult in a Task.Run for pre 4.6 will make sure that Puse() is not blocked by the continuation of the task.

See the SO question Detect target framework version at compile time on how to make a NET_46_OR_GREATER definition that works in your code.

like image 42
Scott Chamberlain Avatar answered Oct 05 '22 13:10

Scott Chamberlain