I already used BackgroundWorker
and Task
to do something in the background and after it posting it back to the UI. I even used BackgroundWorker
and ReportProgress
with an endless-loop (beside cancellation) to continuously post things to the UI thread.
But this time I need a more controllable scenario:
The background thread continuously polls other systems. With Invoke
it can send updates the the UI. But how can the UI send message to the background thread? Like changed settings.
In fact I am asking for the best .NET practice to have a worker thread with these specifics:
Invoke
, Dispatch
)In my scenario I still use WinForms but I guess it should not matter? I will convert the application to WPF later.
Which best practice do you suggest?
Background threads are identical to foreground threads with one exception: a background thread does not keep the managed execution environment running. Once all foreground threads have been stopped in a managed process (where the .exe file is a managed assembly), the system stops all background threads and shuts down.
For example, if your app makes a network request from the main thread, your app's UI is frozen until it receives the network response. You can create additional background threads to handle long-running operations while the main thread continues to handle UI updates.
Background processing in Android refers to the execution of tasks in different threads than the Main Thread, also known as UI Thread, where views are inflated and where the user interacts with our app.
Conversation threading is a feature used by many email clients, bulletin boards, newsgroups, and Internet forums in which the software aids the user by visually grouping messages with their replies. These groups are called a conversation, topic thread, or simply a thread.
I would use TPL and a custom task scheduler for this, similar to Stephen Toub's StaTaskScheduler
. That's what WorkerWithTaskScheduler
implements below. In this case, the worker thread is also a task scheduler, which can run arbitrary Task
items (with ExecutePendingTasks
) while doing the work on its main loop.
Executing a lambda wrapped as a TPL Task
on the worker thread's context is a very convenient way to send the worker thread a message and get back the result. This can be done synchrounsly with WorkerWithTaskScheduler.Run().Wait/Result
or asynchronously with await WorkerWithTaskScheduler.Run()
. Note how ContinueExecution
and WaitForPendingTasks
are used to pause/resume/end the worker's main loop. I hope the code is self-explanatory, but let me know if I should clarify anything.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Console_21628490
{
// Test
class Program
{
static async Task DoWorkAsync()
{
Console.WriteLine("Initial thread: " + Thread.CurrentThread.ManagedThreadId);
// the worker thread lambda
Func<WorkerWithTaskScheduler<int>, int> workAction = (worker) =>
{
var result = 0;
Console.WriteLine("Worker thread: " + Thread.CurrentThread.ManagedThreadId);
while (worker.ContinueExecution)
{
// observe cancellation
worker.Token.ThrowIfCancellationRequested();
// executed pending tasks scheduled with WorkerWithTaskScheduler.Run
worker.ExecutePendingTasks();
// do the work item
Thread.Sleep(200); // simulate work payload
result++;
Console.Write("\rDone so far: " + result);
if (result > 100)
break; // done after 100 items
}
return result;
};
try
{
// cancel in 30s
var cts = new CancellationTokenSource(30000);
// start the worker
var worker = new WorkerWithTaskScheduler<int>(workAction, cts.Token);
// pause upon Enter
Console.WriteLine("\nPress Enter to pause...");
Console.ReadLine();
worker.WaitForPendingTasks = true;
// resume upon Enter
Console.WriteLine("\nPress Enter to resume...");
Console.ReadLine();
worker.WaitForPendingTasks = false;
// send a "message", i.e. run a lambda inside the worker thread
var response = await worker.Run(() =>
{
// do something in the context of the worker thread
return Thread.CurrentThread.ManagedThreadId;
}, cts.Token);
Console.WriteLine("\nReply from Worker thread: " + response);
// End upon Enter
Console.WriteLine("\nPress Enter to stop...");
Console.ReadLine();
// worker.EndExecution() to get the result gracefully
worker.ContinueExecution = false; // or worker.Cancel() to throw
var result = await worker.WorkerTask;
Console.Write("\nFinished, result: " + result);
}
catch (Exception ex)
{
while (ex is AggregateException)
ex = ex.InnerException;
Console.WriteLine(ex.Message);
}
}
static void Main(string[] args)
{
DoWorkAsync().Wait();
Console.WriteLine("\nPress Enter to Exit.");
Console.ReadLine();
}
}
//
// WorkerWithTaskScheduler
//
public class WorkerWithTaskScheduler<TResult> : TaskScheduler, IDisposable
{
readonly CancellationTokenSource _workerCts;
Task<TResult> _workerTask;
readonly BlockingCollection<Task> _pendingTasks;
Thread _workerThread;
volatile bool _continueExecution = true;
volatile bool _waitForTasks = false;
// start the main loop
public WorkerWithTaskScheduler(
Func<WorkerWithTaskScheduler<TResult>, TResult> executeMainLoop,
CancellationToken token)
{
_pendingTasks = new BlockingCollection<Task>();
_workerCts = CancellationTokenSource.CreateLinkedTokenSource(token);
_workerTask = Task.Factory.StartNew<TResult>(() =>
{
_workerThread = Thread.CurrentThread;
return executeMainLoop(this);
}, _workerCts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
// a sample action for WorkerWithTaskScheduler constructor
public static void ExecuteMainLoop(WorkerWithTaskScheduler<TResult> worker)
{
while (!worker.ContinueExecution)
{
worker.Token.ThrowIfCancellationRequested();
worker.ExecutePendingTasks();
}
}
// get the Task
public Task<TResult> WorkerTask
{
get { return _workerTask; }
}
// get CancellationToken
public CancellationToken Token
{
get { return _workerCts.Token; }
}
// check/set if the main loop should continue
public bool ContinueExecution
{
get { return _continueExecution; }
set { _continueExecution = value; }
}
// request cancellation
public void Cancel()
{
_workerCts.Cancel();
}
// check if we're on the correct thread
public void VerifyWorkerThread()
{
if (Thread.CurrentThread != _workerThread)
throw new InvalidOperationException("Invalid thread.");
}
// check if the worker task itself is still alive
public void VerifyWorkerTask()
{
if (_workerTask == null || _workerTask.IsCompleted)
throw new InvalidOperationException("The worker thread has ended.");
}
// make ExecutePendingTasks block or not block
public bool WaitForPendingTasks
{
get { return _waitForTasks; }
set
{
_waitForTasks = value;
if (value) // wake it up
Run(() => { }, this.Token);
}
}
// execute all pending tasks and return
public void ExecutePendingTasks()
{
VerifyWorkerThread();
while (this.ContinueExecution)
{
this.Token.ThrowIfCancellationRequested();
Task item;
if (_waitForTasks)
{
item = _pendingTasks.Take(this.Token);
}
else
{
if (!_pendingTasks.TryTake(out item))
break;
}
TryExecuteTask(item);
}
}
//
// TaskScheduler methods
//
protected override void QueueTask(Task task)
{
_pendingTasks.Add(task);
}
protected override IEnumerable<Task> GetScheduledTasks()
{
return _pendingTasks.ToArray();
}
protected override bool TryExecuteTaskInline(
Task task, bool taskWasPreviouslyQueued)
{
return _workerThread == Thread.CurrentThread &&
TryExecuteTask(task);
}
public override int MaximumConcurrencyLevel
{
get { return 1; }
}
public void Dispose()
{
if (_workerTask != null)
{
_workerCts.Cancel();
_workerTask.Wait();
_pendingTasks.Dispose();
_workerTask = null;
}
}
//
// Task.Factory.StartNew wrappers using this task scheduler
//
public Task Run(Action action, CancellationToken token)
{
VerifyWorkerTask();
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this);
}
public Task<T> Run<T>(Func<T> action, CancellationToken token)
{
VerifyWorkerTask();
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this);
}
public Task Run(Func<Task> action, CancellationToken token)
{
VerifyWorkerTask();
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();
}
public Task<T> Run<T>(Func<Task<T>> action, CancellationToken token)
{
VerifyWorkerTask();
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();
}
}
}
To implement worker-to-client notifications, you can use the IProgress<T>
pattern (example of this).
First thing that comes to mind, and the cleanest approach imo is to have the background thread method that is continuously running be an instance method of a class. This class instance can then expose properties/methods that allow others to change state (e.g. through the UI) - some locking may be required since you are reading/updating state from different threads.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With