Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Background thread loop and two-way-communication

I already used BackgroundWorker and Task to do something in the background and after it posting it back to the UI. I even used BackgroundWorker and ReportProgress with an endless-loop (beside cancellation) to continuously post things to the UI thread.

But this time I need a more controllable scenario: The background thread continuously polls other systems. With Invoke it can send updates the the UI. But how can the UI send message to the background thread? Like changed settings.

In fact I am asking for the best .NET practice to have a worker thread with these specifics:

  • Runs in background, does not block UI
  • Can send updates to UI (Invoke, Dispatch)
  • Runs in endless loop but can be paused, resumed and stopped in a proper way
  • UI thread can send updated settings to the background thread

In my scenario I still use WinForms but I guess it should not matter? I will convert the application to WPF later.

Which best practice do you suggest?

like image 212
ZoolWay Avatar asked Feb 22 '14 21:02

ZoolWay


People also ask

What is background threading?

Background threads are identical to foreground threads with one exception: a background thread does not keep the managed execution environment running. Once all foreground threads have been stopped in a managed process (where the .exe file is a managed assembly), the system stops all background threads and shuts down.

What is main thread and background thread?

For example, if your app makes a network request from the main thread, your app's UI is frozen until it receives the network response. You can create additional background threads to handle long-running operations while the main thread continues to handle UI updates.

What is background thread in Android?

Background processing in Android refers to the execution of tasks in different threads than the Main Thread, also known as UI Thread, where views are inflated and where the user interacts with our app.

What are communication threads?

Conversation threading is a feature used by many email clients, bulletin boards, newsgroups, and Internet forums in which the software aids the user by visually grouping messages with their replies. These groups are called a conversation, topic thread, or simply a thread.


2 Answers

I would use TPL and a custom task scheduler for this, similar to Stephen Toub's StaTaskScheduler. That's what WorkerWithTaskScheduler implements below. In this case, the worker thread is also a task scheduler, which can run arbitrary Task items (with ExecutePendingTasks) while doing the work on its main loop.

Executing a lambda wrapped as a TPL Task on the worker thread's context is a very convenient way to send the worker thread a message and get back the result. This can be done synchrounsly with WorkerWithTaskScheduler.Run().Wait/Result or asynchronously with await WorkerWithTaskScheduler.Run(). Note how ContinueExecution and WaitForPendingTasks are used to pause/resume/end the worker's main loop. I hope the code is self-explanatory, but let me know if I should clarify anything.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Console_21628490
{
    // Test
    class Program
    {
        static async Task DoWorkAsync()
        {
            Console.WriteLine("Initial thread: " + Thread.CurrentThread.ManagedThreadId);

            // the worker thread lambda
            Func<WorkerWithTaskScheduler<int>, int> workAction = (worker) =>
            {
                var result = 0;

                Console.WriteLine("Worker thread: " + Thread.CurrentThread.ManagedThreadId);

                while (worker.ContinueExecution)
                {
                    // observe cancellation
                    worker.Token.ThrowIfCancellationRequested();

                    // executed pending tasks scheduled with WorkerWithTaskScheduler.Run
                    worker.ExecutePendingTasks();

                    // do the work item
                    Thread.Sleep(200); // simulate work payload
                    result++;

                    Console.Write("\rDone so far: " + result);
                    if (result > 100)
                        break; // done after 100 items
                }
                return result;
            };

            try
            {
                // cancel in 30s
                var cts = new CancellationTokenSource(30000);
                // start the worker
                var worker = new WorkerWithTaskScheduler<int>(workAction, cts.Token);

                // pause upon Enter
                Console.WriteLine("\nPress Enter to pause...");
                Console.ReadLine();
                worker.WaitForPendingTasks = true;

                // resume upon Enter
                Console.WriteLine("\nPress Enter to resume...");
                Console.ReadLine();
                worker.WaitForPendingTasks = false;

                // send a "message", i.e. run a lambda inside the worker thread
                var response = await worker.Run(() =>
                {
                    // do something in the context of the worker thread
                    return Thread.CurrentThread.ManagedThreadId;
                }, cts.Token);
                Console.WriteLine("\nReply from Worker thread: " + response);

                // End upon Enter
                Console.WriteLine("\nPress Enter to stop...");
                Console.ReadLine();

                // worker.EndExecution() to get the result gracefully
                worker.ContinueExecution = false; // or worker.Cancel() to throw
                var result = await worker.WorkerTask;

                Console.Write("\nFinished, result: " + result);
            }
            catch (Exception ex)
            {
                while (ex is AggregateException)
                    ex = ex.InnerException;
                Console.WriteLine(ex.Message);
            }
        }

        static void Main(string[] args)
        {
            DoWorkAsync().Wait();
            Console.WriteLine("\nPress Enter to Exit.");
            Console.ReadLine();
        }
    }

    //
    // WorkerWithTaskScheduler
    //

    public class WorkerWithTaskScheduler<TResult> : TaskScheduler, IDisposable
    {
        readonly CancellationTokenSource _workerCts;
        Task<TResult> _workerTask;

        readonly BlockingCollection<Task> _pendingTasks;
        Thread _workerThread;

        volatile bool _continueExecution = true;
        volatile bool _waitForTasks = false;

        // start the main loop
        public WorkerWithTaskScheduler(
            Func<WorkerWithTaskScheduler<TResult>, TResult> executeMainLoop,
            CancellationToken token)
        {
            _pendingTasks = new BlockingCollection<Task>();
            _workerCts = CancellationTokenSource.CreateLinkedTokenSource(token);

            _workerTask = Task.Factory.StartNew<TResult>(() =>
            {
                _workerThread = Thread.CurrentThread;
                return executeMainLoop(this);
            }, _workerCts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
        }

        // a sample action for WorkerWithTaskScheduler constructor
        public static void ExecuteMainLoop(WorkerWithTaskScheduler<TResult> worker)
        {
            while (!worker.ContinueExecution)
            {
                worker.Token.ThrowIfCancellationRequested();
                worker.ExecutePendingTasks();
            }
        }

        // get the Task
        public Task<TResult> WorkerTask
        {
            get { return _workerTask; }
        }

        // get CancellationToken 
        public CancellationToken Token
        {
            get { return _workerCts.Token; }
        }

        // check/set if the main loop should continue
        public bool ContinueExecution
        {
            get { return _continueExecution; }
            set { _continueExecution = value; }
        }

        // request cancellation
        public void Cancel()
        {
            _workerCts.Cancel();
        }

        // check if we're on the correct thread
        public void VerifyWorkerThread()
        {
            if (Thread.CurrentThread != _workerThread)
                throw new InvalidOperationException("Invalid thread.");
        }

        // check if the worker task itself is still alive
        public void VerifyWorkerTask()
        {
            if (_workerTask == null || _workerTask.IsCompleted)
                throw new InvalidOperationException("The worker thread has ended.");
        }

        // make ExecutePendingTasks block or not block
        public bool WaitForPendingTasks
        {
            get { return _waitForTasks; }
            set 
            { 
                _waitForTasks = value;
                if (value) // wake it up
                    Run(() => { }, this.Token);
            }
        }

        // execute all pending tasks and return
        public void ExecutePendingTasks()
        {
            VerifyWorkerThread();

            while (this.ContinueExecution)
            {
                this.Token.ThrowIfCancellationRequested();

                Task item;
                if (_waitForTasks)
                {
                    item = _pendingTasks.Take(this.Token);
                }
                else
                {
                    if (!_pendingTasks.TryTake(out item))
                        break;
                }

                TryExecuteTask(item);
            }
        }

        //
        // TaskScheduler methods
        //

        protected override void QueueTask(Task task)
        {
            _pendingTasks.Add(task);
        }

        protected override IEnumerable<Task> GetScheduledTasks()
        {
            return _pendingTasks.ToArray();
        }

        protected override bool TryExecuteTaskInline(
            Task task, bool taskWasPreviouslyQueued)
        {
            return _workerThread == Thread.CurrentThread &&
                TryExecuteTask(task);
        }

        public override int MaximumConcurrencyLevel
        {
            get { return 1; }
        }

        public void Dispose()
        {
            if (_workerTask != null)
            {
                _workerCts.Cancel();
                _workerTask.Wait();
                _pendingTasks.Dispose();
                _workerTask = null;
            }
        }

        //
        // Task.Factory.StartNew wrappers using this task scheduler
        //

        public Task Run(Action action, CancellationToken token)
        {
            VerifyWorkerTask();
            return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this);
        }

        public Task<T> Run<T>(Func<T> action, CancellationToken token)
        {
            VerifyWorkerTask();
            return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this);
        }

        public Task Run(Func<Task> action, CancellationToken token)
        {
            VerifyWorkerTask();
            return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();
        }

        public Task<T> Run<T>(Func<Task<T>> action, CancellationToken token)
        {
            VerifyWorkerTask();
            return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();
        }
    }
}

To implement worker-to-client notifications, you can use the IProgress<T> pattern (example of this).

like image 107
noseratio Avatar answered Sep 28 '22 14:09

noseratio


First thing that comes to mind, and the cleanest approach imo is to have the background thread method that is continuously running be an instance method of a class. This class instance can then expose properties/methods that allow others to change state (e.g. through the UI) - some locking may be required since you are reading/updating state from different threads.

like image 29
BrokenGlass Avatar answered Sep 28 '22 15:09

BrokenGlass