I have a simple ASP.NET Core Web API application that runs in docker container.
This app should process HTTP requests and consume events from Kafka. So, in Startup.Configure method I run dedicated thread with infinity loop to consume events.
public void Consume()
{
Task.Factory.StartNew(async () =>
{
try
{
while (true)
{
var eventMsg = _consumer.Consume();
await Handle(eventMsg);
}
}
catch (Exception ex)
{
_consumer.Close();
// log error
throw;
}
}, TaskCreationOptions.LongRunning);
}
Then in Startup
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
else
{
app.UseHsts();
}
app.UseAuthentication();
app.UseMvc();
var consumer = app.ApplicationServices.GetRequiredService<KafkaConsumer>();
consumer.Consume();
}
But when error occurs in loop I need to restart container with application. In simple application when I add throw; then it generate unhandled exception, close application and in a result container will be restarted. But in this case throw; didn't help, because it throw in background thread and I'm not waiting for result. I'm not waiting for result, because when I add
await consumer.Consume();
it stops all application and WebAPI not run.
How to handle this in proper way? If short, I need throw exception in background thread then close app to automatically restart container. I can't add Application.Exit to Consume method, because it is a library, and I can't change code.
Is there any other way to kill app from background thread?
Please let me know if I need to add more info before closing it.
This app should process HTTP requests and consume events from Kafka. So, in Startup.Configure method I run dedicated thread with infinity loop to consume events.
The proper way to run a background service on ASP.NET Core is to use Hosted Services. So your code would look like this:
public class KafkaConsumer : BackgroundService
{
public KafkaConsumer()
{
_consumer = ...;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken) => Task.Run(async () =>
{
try
{
while (true)
{
var eventMsg = _consumer.Consume();
await Handle(eventMsg);
}
}
catch (Exception ex)
{
_consumer.Close();
... // log error
throw;
}
});
}
and it's registered in your startup class like this:
public void ConfigureServices(IServiceCollection services)
{
services.AddHostedService<KafkaConsumer>();
}
But when error occurs in loop I need to restart container with application.
To do this, inject IHostApplicationLifetime and call StopApplication:
public class KafkaConsumer : BackgroundService
{
private IHostApplicationLifetime _hostApplicationLifetime;
public KafkaConsumer(IHostApplicationLifetime hostApplicationLifetime)
{
_hostApplicationLifetime = hostApplicationLifetime;
_consumer = ...;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken) => Task.Run(async () =>
{
try
{
while (true)
{
var eventMsg = _consumer.Consume();
await Handle(eventMsg);
}
}
catch (Exception ex)
{
_consumer.Close();
... // log error
_hostApplicationLifetime.StopApplication();
throw;
}
});
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With