Calling _thread.Join()
causes the GetConsumingEnumerable
loop to be stuck on the last element. Why does this behavior occur?
public abstract class ActorBase : IDisposable
{
private readonly BlockingCollection<Task> _queue = new BlockingCollection<Task>(new ConcurrentQueue<Task>());
private readonly Thread _thread;
private bool _isDisposed;
protected ActorBase()
{
_thread = new Thread(ProcessMessages);
_thread.Start();
}
protected void QueueTask(Task task)
{
if (_isDisposed)
{
throw new Exception("Actor was disposed, cannot queue task.");
}
_queue.Add(task);
}
private void ProcessMessages()
{
foreach (var task in _queue.GetConsumingEnumerable())
{
task.RunSynchronously();
}
}
public void Dispose()
{
_isDisposed = true;
_queue.CompleteAdding();
_thread.Join();
}
}
public class SampleActor : ActorBase
{
private string GetThreadStatus()
{
Thread.Sleep(500);
return string.Format("Running on thread {0}", Thread.CurrentThread.ManagedThreadId);
}
public async Task<string> GetThreadStatusAsync()
{
var task = new Task<string>(GetThreadStatus);
QueueTask(task);
return await task;
}
}
class Program
{
public static async Task Run()
{
using (var sa = new SampleActor())
{
for (int i = 0; i < 3; i++)
{
Console.WriteLine(await sa.GetThreadStatusAsync());
}
}
}
public static void Main(string[] args)
{
Console.WriteLine("Main thread id {0}", Thread.CurrentThread.ManagedThreadId);
var task = Task.Run(async ()=> { await Run(); });
task.Wait();
}
}
The context for this approach is that I need to make sure that all operations are executed on one OS thread, which would allow a part of the app to use different credentials than the main thread.
async-await
works with continuations. To be efficient and reduce scheduling these continuations usually run on the same thread that completed the previous task.
That means in your case that your special thread is not only running the tasks, it's also running all the continuations after these tasks (the for
loop itself). You can see that by printing the thread id:
using (var sa = new SampleActor())
{
for (int i = 0; i < 3; i++)
{
Console.WriteLine(await sa.GetThreadStatusAsync());
Console.WriteLine("Continue on thread :" + Thread.CurrentThread.ManagedThreadId);
}
}
When the for
loop completes and the SampleActor
is being disposed you call Thread.Join
from the same thread your are trying to join so you get a deadlock. Your situation boils down to this:
public static void Main()
{
Thread thread = null;
thread = new Thread(() =>
{
Thread.Sleep(100);
thread.Join();
Console.WriteLine("joined");
});
thread.Start();
}
In .Net 4.6 you can solve this with TaskCreationOptions.RunContinuationsAsynchronously
but in the current version you can specify the default TaskScheduler
:
public Task<string> GetThreadStatusAsync()
{
var task = new Task<string>(GetThreadStatus);
QueueTask(task);
return task.ContinueWith(task1 => task1.GetAwaiter().GetResult(), TaskScheduler.Default);
}
It might be tempting to put a simple check to see if the thread you're trying to Join
is Thread.CurrentThread
, but that would be wrong.
Furthermore, I think the whole approach - scheduling and running cold Task
objects with a custom, non-TPL-compliant scheduler - is wrong. You should be using a TPL-friendly task scheduler, similar to Stephen Toub's StaTaskScheduler
. Or run a custom SynchronizationContext
for your actor-serving thread (like Toub's AsyncPump
) and use TaskScheduler.FromCurrentSynchronizationContext
and Task.Factory.StartNew
to schedue tasks with your custom scheduler (or use Task.Start(TaskScheduler)
if you have to deal with cold tasks).
This way, you'll have full control of where tasks and their continuations run, as well as of task inlining.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With