Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Thread.Interrupt equivalent for Task TPL

Some background: my C# code calls into some unmanaged code (C++) that does a blocking wait. The blocking wait, however, is alertable (like Thread.Sleep - I suppose it calls WaitForSingleObjectEx with bAlertable TRUE under the cover); I know for sure it is alertable, as it can be "waked up" by QueueUserAPC.

If I could simply use managed Threads, I would just call the blocking method, and then use Thread.Interrupt to "wake" the thread when I need it to exit; something like this:

void ThreadFunc() {
    try {
           Message message;
           comObject.GetMessage(out message);
           //....
     }
     catch (ThreadInterruptedException) {
        // We need to exit
        return;
     }
}

var t - new Thread(ThreadFunc);
//....
t.Interrupt();

(NOTE: I am not using this code, but it is something that, to the top of my knowledge, could work for this peculiar situation (alertable wait in unmanaged code out of my control). What I'm looking for is the best equivalent (or a better alternative!) to this in TPL).

But I have to use the TPL (Tasks instead of managed Threads), and the unmanaged method is out of my control (I cannot modify it to call WaitForMultipleObjectEx and make it return when I signal en Event, for example).

I am looking for a Thread.Interrupt equivalent for Tasks (something that will post an APC on the underlying thread). AFAIK, CancellationTokens require the code to be "Task aware", and do not use this technique, but I'm not sure: what happens, I wonder, if a task does a Thread.Sleep (I know there is a Task.Wait, but it's just for having an example of a non-task wait which is alertable), can it be cancelled?

Is my assumption wrong (I mean, could I just use a CT and everything will work? But how?).

If there is no such method... I'm open to suggestions. I'd really like to avoid to mix Threads and Tasks, or use P/Invoke, but if there is no other way, I would still like to do it in the "cleanest" way possible (which means: no rude aborts, and something "Tasky" :) )

Edit:

For those who are curious, I have "confirmed" that Thread.Interrupt could work in my case because it calls QueueUserAPC. It calls InterruptInternal, then Thread::UserInterrupt, then Alert, which queues the APC. It is actually quite clever, as it allows you to sleep/wait and then wake a thread without the need to use another synchronization primitive.

I just need to find a TPL primitive that follows the same flow

like image 319
Lorenzo Dematté Avatar asked Jan 24 '15 18:01

Lorenzo Dematté


2 Answers

Currently, all existing production CLR hosts implement one-to-one managed-to-unmanaged thread mapping. This is particularly true about Windows Desktop OS family where your legacy COM object runs.

In this light, you can use TPL's Task.Run instead of classic threading APIs and still call QueueUserAPC via p/invoke to release your COM object from alterable wait state, when the cancellation token has been triggered.

The code below shows how to do that. One thing to note, all ThreadPool threads (including those started by Task.Run) implicitly run under COM MTA apartment. Thus, the COM object needs to support the MTA model without implicit COM marshaling. If it isn't the case, you'll probably need a custom task scheduler (like StaTaskScheduler) to be used instead of Task.Run.

using System;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication
{
    class Program
    {
        static int ComGetMessage()
        {
            NativeMethods.SleepEx(2000, true);
            return 42;
        }

        static int GetMessage(CancellationToken token)
        {
            var apcWasCalled = false;
            var gcHandle = default(GCHandle);
            var apcCallback = new NativeMethods.APCProc(target => 
            {
                apcWasCalled = true;
                gcHandle.Free();
            });

            var hCurThread = NativeMethods.GetCurrentThread();
            var hCurProcess = NativeMethods.GetCurrentProcess();
            IntPtr hThread;
            if (!NativeMethods.DuplicateHandle(
                hCurProcess, hCurThread, hCurProcess, out hThread,
                0, false, NativeMethods.DUPLICATE_SAME_ACCESS))
            {
                throw new System.ComponentModel.Win32Exception(Marshal.GetLastWin32Error());
            }
            try
            {
                int result;
                using (token.Register(() => 
                    {
                        gcHandle = GCHandle.Alloc(apcCallback);
                        NativeMethods.QueueUserAPC(apcCallback, hThread, UIntPtr.Zero);
                    },
                    useSynchronizationContext: false))
                {
                    result = ComGetMessage();
                }
                Trace.WriteLine(new { apcWasCalled });
                token.ThrowIfCancellationRequested();
                return result;
            }
            finally
            {
                NativeMethods.CloseHandle(hThread);
            }
        }

        static async Task TestAsync(int delay)
        {
            var cts = new CancellationTokenSource(delay);
            try
            {
                var result = await Task.Run(() => GetMessage(cts.Token));
                Console.WriteLine(new { result });
            }
            catch (OperationCanceledException)
            {
                Console.WriteLine("Cancelled.");
            }
        }

        static void Main(string[] args)
        {
            TestAsync(3000).Wait();
            TestAsync(1000).Wait();
        }

        static class NativeMethods
        {
            public delegate void APCProc(UIntPtr dwParam);

            [DllImport("kernel32.dll", SetLastError = true)]
            public static extern uint SleepEx(uint dwMilliseconds, bool bAlertable);

            [DllImport("kernel32.dll", SetLastError = true)]
            public static extern uint QueueUserAPC(APCProc pfnAPC, IntPtr hThread, UIntPtr dwData);

            [DllImport("kernel32.dll")]
            public static extern IntPtr GetCurrentThread();

            [DllImport("kernel32.dll")]
            public static extern IntPtr GetCurrentProcess();

            [DllImport("kernel32.dll", SetLastError = true)]
            public static extern bool CloseHandle(IntPtr handle);

            public const uint DUPLICATE_SAME_ACCESS = 2;

            [DllImport("kernel32.dll", SetLastError = true)]
            public static extern bool DuplicateHandle(IntPtr hSourceProcessHandle,
               IntPtr hSourceHandle, IntPtr hTargetProcessHandle, out IntPtr lpTargetHandle,
               uint dwDesiredAccess, bool bInheritHandle, uint dwOptions);
        }
    }
}
like image 145
noseratio Avatar answered Nov 01 '22 15:11

noseratio


I wonder, if a task does a Thread.Sleep (I know there is a Task.Wait, but it's just for having an example of a non-task wait which is alertable), can it be cancelled?

No, it cannot. Cancellation of tasks is defined by the user. It is cooperative cancellation which requires the user to explicitly check the state of the CancellationToken

Note that there is an overload of Task.Wait which takes a CancellationToken:

/// <summary>
/// Waits for the task to complete, for a timeout to occur, 
/// or for cancellation to be requested.
/// The method first spins and then falls back to blocking on a new event.
/// </summary>
/// <param name="millisecondsTimeout">The timeout.</param>
/// <param name="cancellationToken">The token.</param>
/// <returns>true if the task is completed; otherwise, false.</returns>
private bool SpinThenBlockingWait(int millisecondsTimeout, 
                                  CancellationToken cancellationToken)
{
    bool infiniteWait = millisecondsTimeout == Timeout.Infinite;
    uint startTimeTicks = infiniteWait ? 0 : (uint)Environment.TickCount;
    bool returnValue = SpinWait(millisecondsTimeout);
    if (!returnValue)
    {
        var mres = new SetOnInvokeMres();
        try
        {
            AddCompletionAction(mres, addBeforeOthers: true);
            if (infiniteWait)
            {
                returnValue = mres.Wait(Timeout.Infinite,
                                        cancellationToken);
            }
            else
            {
                uint elapsedTimeTicks = ((uint)Environment.TickCount) -
                                               startTimeTicks;
                if (elapsedTimeTicks < millisecondsTimeout)
                {
                    returnValue = mres.Wait((int)(millisecondsTimeout -
                                             elapsedTimeTicks), cancellationToken);
                }
            }
        }
        finally
        {
            if (!IsCompleted) RemoveContinuation(mres);
            // Don't Dispose of the MRES, because the continuation off
            // of this task may still be running.  
            // This is ok, however, as we never access the MRES' WaitHandle,
            // and thus no finalizable resources are actually allocated.
        }
    }
    return returnValue;
}

It will attempt to spin the thread on certain condition. If that isn't enough, it will end up calling Monitor.Wait which actually blocks:

/*========================================================================
** Waits for notification from the object (via a Pulse/PulseAll). 
** timeout indicates how long to wait before the method returns.
** This method acquires the monitor waithandle for the object 
** If this thread holds the monitor lock for the object, it releases it. 
** On exit from the method, it obtains the monitor lock back. 
** If exitContext is true then the synchronization domain for the context 
** (if in a synchronized context) is exited before the wait and reacquired 
**
** Exceptions: ArgumentNullException if object is null.
========================================================================*/
[System.Security.SecurityCritical]  // auto-generated
[ResourceExposure(ResourceScope.None)]
[MethodImplAttribute(MethodImplOptions.InternalCall)]
private static extern bool ObjWait(bool exitContext, int millisecondsTimeout, Object obj);
like image 37
Yuval Itzchakov Avatar answered Nov 01 '22 16:11

Yuval Itzchakov