Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

SignalR core - invalidate dead connections

The problem

I'm using .NET Core 2.2 with ASP.NET Core SignalR. Currently I'm saving all connection states in a SQL database (see this document; even though it's a manual for the "old" SignalR library, the logic is the same). I'm also using a Redis backplane, since my application can scale horizontally.

However, when restarting my application, current connections do not get closed and will get orphaned. The previously linked article states:

If your web servers stop working or the application restarts, the OnDisconnected method is not called. Therefore, it is possible that your data repository will have records for connection ids that are no longer valid. To clean up these orphaned records, you may wish to invalidate any connection that was created outside of a timeframe that is relevant to your application.

The question

In the "old" SignalR there is an ITransportHeartbeat (which this script perfectly implements) but there's no such interface for the .NET Core version (atleast, I couldn't find it).

How do I know whether an connection is no longer alive? I want (or actually need) to clean up old connection id's.

like image 967
Devator Avatar asked Feb 06 '19 22:02

Devator


2 Answers

Updated after @davidfowl's comments in the other answer.

.NET Core 2.1 with SignalR has IConnectionHeartbeatFeature which you can use to achieve something similar to what you could with ITransportHeartbeat in old SignalR.

The main crux of the code below is that we maintain an in-memory list that tracks connections that need to be updated in the database. This allows us to do expensive database operations at a controlled interval and in batch. IConnectionHeartbeatFeature.OnHeartbeat() is fired every second for each connection, so hitting the database at that frequency could take your server down at scale.

Firstly create an entity to maintain a list of connections in memory that the server has yet to update:

public interface IConnectionCounter
{
    internal ConcurrentDictionary<string, DateTime> Connections { get; }

    public void RecordConnectionLastSeen(string connectionId);
    public void RemoveConnection(string connectionId);
}

/// <summary>
/// Maintains a dictionary of connections that need to be refreshed in the 
/// database
/// </summary>
public class ConnectionCounter : IConnectionCounter
{
    private readonly ConcurrentDictionary<string, DateTime> _connections;
    ConcurrentDictionary<string, DateTime> IConnectionCounter.Connections 
        => _connections;

    public ConnectionCounter()
    {
        _connections = new ConcurrentDictionary<string, DateTime>();
    }

    public void RecordConnectionLastSeen(string connectionId)
    {
        var now = DateTime.UtcNow;
        _connections.AddOrUpdate(
            connectionId, 
            now, (existingConnectionId, oldTime) => now);
    }

    public void RemoveConnection(string connectionId)
    {
        _connections.Remove(connectionId, out _);
    }
}

Note, this is NOT a definitive list of all online connections that need to be updated, as connections may be distributed across multiple servers. If you've got many servers, you could reduce the load further by storing these connections in a distributed in-memory store like Redis.

Next, set up the IConnectionCounter in the Hub so that connections are counted.

public class ChatHub : Hub
{
    private readonly IConnectionCounter _connectionCounter;

    public ChatHub(
        IConnectionCounter connectionCounter)
    {
        _connectionCounter = connectionCounter;
    }

    [AllowAnonymous]
    public override Task OnConnectedAsync()
    {
        var connectionHeartbeat = 
            Context.Features.Get<IConnectionHeartbeatFeature>();

        connectionHeartbeat.OnHeartbeat(connectionId => {
            _connectionCounter.RecordConnectionLastSeen((string)connectionId); 
        }, Context.ConnectionId);

        return base.OnConnectedAsync();
    }
}

Now create a service that takes the connections in IConnectionCounter and updates the database with the state of said connection:

public interface IPresenceDatabaseSyncer
{
    public Task UpdateConnectionsOnlineStatus();
}

/// <summary>
/// Handles updating the online status of connections whose connections
/// that need to be updated in the database
/// </summary>
public class PresenceDatabaseSyncer : IPresenceDatabaseSyncer
{
    private readonly MyDbContext _context;
    private readonly IConnectionCounter _connectionCounter;

    public PresenceDatabaseSyncer(
        MyDbContext context, 
        IConnectionCounter connectionCounter)
    {
        _context = context;
        _connectionCounter = connectionCounter;
    }

    public async Task UpdateConnectionsOnlineStatus()
    {
        if (_connectionCounter.Connections.IsEmpty)
            return;

        foreach (var connection in _connectionCounter.Connections)
        {
            var connectionId = connection.Key;
            var lastPing = connection.Value;

            var dbConnection = _context.Connection
                .FirstOrDefault(x => x.ConnectionId == connectionId);

            if (dbConnection != null)
                dbConnection.LastPing = lastPing;

            _connectionCounter.RemoveConnection(connectionId);
        }
    }
}

I then use a HostedService to continuously run the db sync above:

/// <summary>
/// Runs a periodic sync operation to ensure that connections are 
/// recorded as being online correctly in the database
/// </summary>
public class PresenceDatabaseSyncerHostedService : IHostedService, IDisposable
{
    private const int SyncIntervalSeconds = 10;

    private readonly IServiceScopeFactory _serviceScopeFactory;
    private Timer _timer;

    public PresenceDatabaseSyncerHostedService(
        IServiceScopeFactory serviceScopeFactory)
    {
        _serviceScopeFactory = serviceScopeFactory;
    }

    public Task StartAsync(CancellationToken stoppingToken)
    {
        _timer = new Timer(
            DoWork, 
            null, 
            TimeSpan.Zero, 
            TimeSpan.FromSeconds(SyncIntervalSeconds));

        return Task.CompletedTask;
    }

    private async void DoWork(object state)
    {
        using var scope = _serviceScopeFactory.CreateScope();
        var scopedProcessingService = 
            scope.ServiceProvider.GetRequiredService<IPresenceDatabaseSyncer>();

        await scopedProcessingService.UpdateConnectionsOnlineStatus();
    }

    public Task StopAsync(CancellationToken stoppingToken)
    {
        _timer?.Change(Timeout.Infinite, 0);
        return Task.CompletedTask;
    }

    public void Dispose()
    {
        _timer?.Dispose();
    }
}

Finally register these dependencies and services:

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
        services.AddSingleton<IConnectionCounter, ConnectionCounter>();
        services.AddScoped<IPresenceDatabaseSyncer, PresenceDatabaseSyncer>();

        services.AddHostedService<PresenceDatabaseSyncerHostedService>();
        // ...
    }
    
    // ...
}

Of course there is still the matter of actually cleaning up the stale connections from the database. I handle this using another HostedService and will leave as an exercise to the reader.

If you're using the Azure SignalR Service, there's an additional benefit over manually sending a KeepAlive message as per @Devator's answer in that you don't need to pay for the message (since OnHeartbeat occurs internally).

Keep in mind that this feature is not really documented that well. I've been using this in production for a few months now, but I haven't seen other solutions using this technique.

like image 184
ajbeaven Avatar answered Sep 18 '22 14:09

ajbeaven


The solution I came up with is as follows. It's not as elegant, but for now I see no other option.

I updated the model in the database to not only contain a ConnectionId but also a LastPing (which is a DateTime type). The client sends a KeepAlive message (custom message, not using the SignalR keepalive settings). Upon receiving the message (server side), I update the database with the current time:

var connection = _context.Connection.FirstOrDefault(x => x.Id == Context.ConnectionId);
connection.LastPing = DateTime.UtcNow;

To clean up the orphaned connections (which are not removed by SignalR's OnDisconnected method), I have a task running periodically (currently in Hangfire) which removes the connections where the LastPing field has not been updated recently.

like image 35
Devator Avatar answered Sep 19 '22 14:09

Devator