Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Task-based idle detection

It's not unusual to want a limit on the interval between certain events, and take action if the limit is exceeded. For example, a heartbeat message between network peers for detecting that the other end is alive.

In the C# async/await style, it is possible to implement that by replacing the timeout task each time the heartbeat arrives:

var client = new TcpClient { ... };
await client.ConnectAsync(...);

Task heartbeatLost = new Task.Delay(HEARTBEAT_LOST_THRESHOLD);
while (...)
{
    Task<int> readTask = client.ReadAsync(buffer, 0, buffer.Length);
    Task first = await Task.WhenAny(heartbeatLost, readTask);
    if (first == readTask) {
        if (ProcessData(buffer, 0, readTask.Result).HeartbeatFound) {
            heartbeatLost = new Task.Delay(HEARTBEAT_LOST_THRESHOLD);
        }
    }
    else if (first == heartbeatLost) {
        TellUserPeerIsDown();
        break;
    }
}

This is convenient, but each instance of the delay Task owns a Timer, and if many heartbeat packets arrive in less time than the threshold, that's a lot of Timer objects loading the threadpool. Also, the completion of each Timer will run code on the threadpool, whether there's any continuation still linked to it or not.

You can't free the old Timer by calling heartbeatLost.Dispose(); that'll give an exception

InvalidOperationException: A task may only be disposed if it is in a completion state

One could create a CancellationTokenSource and use it to cancel the old delay task, but it seems suboptimal to create even more objects to accomplish this, when timers themselves have the feature of being reschedulable.

What's the best way to integrate timer rescheduling, so that the code could be structured more like this?

var client = new TcpClient { ... };
await client.ConnectAsync(...);

var idleTimeout = new TaskDelayedCompletionSource(HEARTBEAT_LOST_THRESHOLD);
Task heartbeatLost = idleTimeout.Task;
while (...)
{
    Task<int> readTask = client.ReadAsync(buffer, 0, buffer.Length);
    Task first = await Task.WhenAny(heartbeatLost, readTask);
    if (first == readTask) {
        if (ProcessData(buffer, 0, readTask.Result).HeartbeatFound) {
            idleTimeout.ResetDelay(HEARTBEAT_LOST_THRESHOLD);
        }
    }
    else if (first == heartbeatLost) {
        TellUserPeerIsDown();
        break;
    }
}
like image 974
Ben Voigt Avatar asked Mar 27 '17 16:03

Ben Voigt


1 Answers

Seems pretty straightforward to me, The name of your hypothetical class gets you most of the way there. All you need is a TaskCompletionSource and a single timer you keep resetting.

public class TaskDelayedCompletionSource
{
    private TaskCompletionSource<bool> _completionSource;
    private readonly System.Threading.Timer _timer;
    private readonly object _lockObject = new object();

    public TaskDelayedCompletionSource(int interval)
    {
        _completionSource = CreateCompletionSource();
        _timer = new Timer(OnTimerCallback);
        _timer.Change(interval, Timeout.Infinite);
    }

    private static TaskCompletionSource<bool> CreateCompletionSource()
    {
        return new TaskCompletionSource<bool>(TaskCreationOptions.DenyChildAttach | TaskCreationOptions.RunContinuationsAsynchronously | TaskCreationOptions.HideScheduler);
    }

    private void OnTimerCallback(object state)
    {
        //Cache a copy of the completion source before we entier the lock, so we don't complete the wrong source if ResetDelay is in the middle of being called.
        var completionSource = _completionSource;
        lock (_lockObject)
        {
            completionSource.TrySetResult(true);
        }
    }

    public void ResetDelay(int interval)
    {
        lock (_lockObject)
        {
            var oldSource = _completionSource;
            _timer.Change(interval, Timeout.Infinite);
            _completionSource = CreateCompletionSource();
            oldSource.TrySetCanceled();
        }
    }
    public Task Task => _completionSource.Task;
}

This will only create a single timer and update it, the task completes when the timer fires.

You will need to change your code slightly, because a new TaskCompletionSource gets created every time you update the end time you need to put the Task heartbeatLost = idleTimeout.Task; call inside the while loop.

var client = new TcpClient { ... };
await client.ConnectAsync(...);

var idleTimeout = new TaskDelayedCompletionSource(HEARTBEAT_LOST_THRESHOLD);
while (...)
{
    Task heartbeatLost = idleTimeout.Task;
    Task<int> readTask = client.ReadAsync(buffer, 0, buffer.Length);
    Task first = await Task.WhenAny(heartbeatLost, readTask);
    if (first == readTask) {
        if (ProcessData(buffer, 0, readTask.Result).HeartbeatFound) {
            idleTimeout.ResetDelay(HEARTBEAT_LOST_THRESHOLD);
        }
    }
    else if (first == heartbeatLost) {
        TellUserPeerIsDown();
    }
}

EDIT: If you where conserened about the object creation of the completion sources (for example you are programming in a Game Engine where GC collection is a large consern) you may be able to add extra logic to OnTimerCallback and ResetDelay to reuse the completion source if the call has not happened yet and you know for sure you are not inside of a Reset Delay.

You will likely need to switch from using a lock to a SemaphoreSlim and change the callback to

    private void OnTimerCallback(object state)
    {
        if(_semaphore.Wait(0))
        {
            _completionSource.TrySetResult(true);
        }
    }

I may update this answer later to include what OnTimerCallback would have too, but I don't have time right now.

like image 112
Scott Chamberlain Avatar answered Oct 20 '22 13:10

Scott Chamberlain