I'm working on a program where I receive data from SignalR, perform processing, and then send a SignalR message back to the client once the processing has finished. I've found a couple of resources for how to do this, but I can't quite figure out how to implement it in my project.
Here's what my code looks like:
Bootstrapping
public static void Main(string[] args)
{
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
List<ISystem> systems = new List<ISystem>
{
new FirstProcessingSystem(),
new SecondProcessingSystem(),
};
Processor processor = new Processor(
cancellationToken: cancellationTokenSource.Token,
systems: systems);
processor.Start();
CreateHostBuilder(args).Build().Run();
cancellationTokenSource.Cancel();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder.UseStartup<Startup>();
});
public class Startup
{
// This method gets called by the runtime. Use this method to add services to the container.
// For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
public void ConfigureServices(IServiceCollection services)
{
services.AddSignalR();
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapHub<TestHub>("/testHub");
});
}
}
TestHub.cs
public class TestHub : Hub
{
public async Task DoStuff(Work work)
{
FirstProcessingSystem.ItemsToProcess.Add(work);
}
}
Work.cs
public class Work
{
public readonly string ConnectionId;
public readonly string Data;
public Work(string connectionId, string data)
{
ConnectionId = connectionId;
Data = data;
}
}
Processor.cs
public class Processor
{
readonly CancellationToken CancellationToken;
readonly List<ISystem> Systems;
public Processor(
CancellationToken cancellationToken,
List<ISystem> systems)
{
CancellationToken = cancellationToken;
Systems = systems;
}
public void Start()
{
Task.Run(() =>
{
while (!CancellationToken.IsCancellationRequested)
{
foreach (var s in Systems)
s.Process();
}
});
}
}
Systems
public interface ISystem
{
void Process();
}
public class FirstProcessingSystem : ISystem
{
public static ConcurrentBag<Work> ItemsToProcess = new ConcurrentBag<Work>();
public void Process()
{
while (!ItemsToProcess.IsEmpty)
{
Work work;
if (ItemsToProcess.TryTake(out work))
{
// Do things...
SecondProcessingSystem.ItemsToProcess.Add(work);
}
}
}
}
public class SecondProcessingSystem : ISystem
{
public static ConcurrentBag<Work> ItemsToProcess = new ConcurrentBag<Work>();
public void Process()
{
while (!ItemsToProcess.IsEmpty)
{
Work work;
if (ItemsToProcess.TryTake(out work))
{
// Do more things...
// Hub.Send(work.ConnectionId, "Finished");
}
}
}
}
I know that I can perform the processing in the Hub and then send back the "Finished" call, but I'd like to decouple my processing from my inbound messaging that way I can add more ISystems
when needed.
Can someone please with this? (Also, if someone has a better way to structure my program I'd also appreciate the feedback)
aspnet has a very powerful dependency injection system, why don't you use it? By creating your worker services without dependency injection, you'll have a hard time using anything provided by aspnet.
Since your "processing systems" seem to be long running services, you'd typically have them implement IHostedService
, then create a generic service starter (taken from here):
public class BackgroundServiceStarter<T> : IHostedService where T : IHostedService
{
readonly T _backgroundService;
public BackgroundServiceStarter(T backgroundService)
{
_backgroundService = backgroundService;
}
public Task StartAsync(CancellationToken cancellationToken)
{
return _backgroundService.StartAsync(cancellationToken);
}
public Task StopAsync(CancellationToken cancellationToken)
{
return _backgroundService.StopAsync(cancellationToken);
}
}
then register them to the DI container in ConfigureServices
:
// make the classes injectable
services.AddSingleton<FirstProcessingSystem>();
services.AddSingleton<SecondProcessingSystem>();
// start them up
services.AddHostedService<BackgroundServiceStarter<FirstProcessingSystem>>();
services.AddHostedService<BackgroundServiceStarter<SecondProcessingSystem>>();
Now that you got all that set up correctly, you can simply inject a reference to your signalR hub using IHubContext<TestHub> context
in the constructor parameters of whatever class that needs it (as described in some of the links you posted).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With