I'm writing a Windows Service that will kick off multiple worker threads that will listen to Amazon SQS queues and process messages. There will be about 20 threads listening to 10 queues.
The threads will have to be always running and that's why I'm leaning towards to actually using actual threads for the worker loops rather than threadpool threads.
Here is a top level implementation. Windows service will kick off multiple worker threads and each will listen to it's queue and process messages.
protected override void OnStart(string[] args)
{
for (int i = 0; i < _workers; i++)
{
new Thread(RunWorker).Start();
}
}
Here is the implementation of the work
public async void RunWorker()
{
while(true)
{
// .. get message from amazon sqs sync.. about 20ms
var message = sqsClient.ReceiveMessage();
try
{
await PerformWebRequestAsync(message);
await InsertIntoDbAsync(message);
}
catch(SomeExeception)
{
// ... log
//continue to retry
continue;
}
sqsClient.DeleteMessage();
}
}
I know I can perform the same operation with Task.Run and execute it on the threadpool thread rather than starting individual thread, but I don't see a reason for that since each thread will always be running.
Do you see any problems with this implementation? How reliable would it be to leave threads always running in this fashion and what can I do to make sure that each thread is always running?
Once the win service is started then this process will run continuously in background.
The Case of Creating Too Many Threads. Our job will take longer to finish if we generate thousands of threads since we'll have to spend time switching between their contexts. Use the thread pool to complete our task rather than creating new threads manually so that the OS can balance the ideal number of threads.
One problem with your existing solution is that you call your RunWorker
in a fire-and-forget manner, albeit on a new thread (i.e., new Thread(RunWorker).Start()
).
RunWorker
is an async
method, it will return to the caller when the execution point hits the first await
(i.e. await PerformWebRequestAsync(message)
). If PerformWebRequestAsync
returns a pending task, RunWorker
returns and the new thread you just started terminates.
I don't think you need a new thread here at all, just use AmazonSQSClient.ReceiveMessageAsync
and await
its result. Another thing is that you shouldn't be using async void
methods unless you really don't care about tracking the state of the asynchronous task. Use async Task
instead.
Your code might look like this:
List<Task> _workers = new List<Task>();
CancellationTokenSource _cts = new CancellationTokenSource();
protected override void OnStart(string[] args)
{
for (int i = 0; i < _MAX_WORKERS; i++)
{
_workers.Add(RunWorkerAsync(_cts.Token));
}
}
public async Task RunWorkerAsync(CancellationToken token)
{
while(true)
{
token.ThrowIfCancellationRequested();
// .. get message from amazon sqs sync.. about 20ms
var message = await sqsClient.ReceiveMessageAsync().ConfigureAwait(false);
try
{
await PerformWebRequestAsync(message);
await InsertIntoDbAsync(message);
}
catch(SomeExeception)
{
// ... log
//continue to retry
continue;
}
sqsClient.DeleteMessage();
}
}
Now, to stop all pending workers, you could simple do this (from the main "request dispatcher" thread):
_cts.Cancel();
try
{
Task.WaitAll(_workers.ToArray());
}
catch (AggregateException ex)
{
ex.Handle(inner => inner is OperationCanceledException);
}
Note, ConfigureAwait(false)
is optional for Windows Service, because there's no synchronization context on the initial thread, by default. However, I'd keep it that way to make the code independent of the execution environment (for cases where there is synchronization context).
Finally, if for some reason you cannot use ReceiveMessageAsync
, or you need to call another blocking API, or simply do a piece of CPU intensive work at the beginning of RunWorkerAsync
, just wrap it with Task.Run
(as opposed to wrapping the whole RunWorkerAsync
):
var message = await Task.Run(
() => sqsClient.ReceiveMessage()).ConfigureAwait(false);
Well, for one I'd use a CancellationTokenSource
instantiated in the service and passed down to the workers. Your while statement would become:
while(!cancellationTokenSource.IsCancellationRequested)
{
//rest of the code
}
This way you can cancel all your workers from the OnStop
service method.
Additionally, you should watch for:
ThreadStateException
, or ThreadInterruptedException
or one of the others might be thrown. So, you want to handle a proper thread restart.ThreadStartException
and restart the worker, if it occurs. Other than that there's no reason why those 10 treads can't run for as long as the service runs (days, weeks, months at a time).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With