Is there a .Net class to do what ManualResetEvent.PulseAll()
would do (if it existed)?
I have a need to atomically release a set of threads that are waiting on the same signal. (I'm not worried about "thread stampedes" for my intended usage.)
You cannot use a ManualResetEvent
to do this. For example, if you do:
ManualResetEventSlim signal = new ManualResetEventSlim();
// ...
signal.Set();
signal.Reset();
Then no threads waiting on signal are released at all.
If you put a Thread.Sleep(5)
between the Set()
and Reset()
calls, then some but not all of the waiting threads all are released. Increasing the sleep to 10ms allows all the threads to be released. (This was tested with 20 threads.)
Clearly it is unacceptable to be adding Thread.Sleep()
to make this work.
However, this is easy enough to do with Monitor.PulseAll()
and I've written a tiny class to do so.
(The reason I've written a class to do this is that we've found that the logic using Monitor, while fairly simple, is non-obvious enough to make it worth having such a class to simplify usage.)
My question is simply this: Is there a class already in .Net to do this?
For reference, here's the bare-bones version of my "ManualResetEvent.PulseAll()
" equivalent:
public sealed class Signaller
{
public void PulseAll()
{
lock (_lock)
{
Monitor.PulseAll(_lock);
}
}
public void Wait()
{
Wait(Timeout.Infinite);
}
public bool Wait(int timeoutMilliseconds)
{
lock (_lock)
{
return Monitor.Wait(_lock, timeoutMilliseconds);
}
}
private readonly object _lock = new object();
}
Here's a sample program that demonstrates that no waiting threads are released if you don't sleep between Set() and Reset():
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
{
public static class Program
{
private static void Main(string[] args)
{
_startCounter = new CountdownEvent(NUM_THREADS);
for (int i = 0; i < NUM_THREADS; ++i)
{
int id = i;
Task.Factory.StartNew(() => test(id));
}
Console.WriteLine("Waiting for " + NUM_THREADS + " threads to start");
_startCounter.Wait(); // Wait for all threads to have started.
Thread.Sleep(100);
Console.WriteLine("Threads all started. Setting signal now.");
_signal.Set();
// Thread.Sleep(5); // With no sleep at all, NO threads receive the signal. Try commenting this line out.
_signal.Reset();
Thread.Sleep(1000);
Console.WriteLine("\n{0}/{1} threads received the signal.\n\n", _signalledCount, NUM_THREADS);
Console.WriteLine("Press any key to exit.");
Console.ReadKey();
}
private static void test(int id)
{
_startCounter.Signal(); // Used so main thread knows when all threads have started.
_signal.Wait();
Interlocked.Increment(ref _signalledCount);
Console.WriteLine("Task " + id + " received the signal.");
}
private const int NUM_THREADS = 20;
private static readonly ManualResetEventSlim _signal = new ManualResetEventSlim();
private static CountdownEvent _startCounter;
private static int _signalledCount;
}
}
Set to signal that the waiting threads can proceed. All waiting threads are released. Once it has been signaled, ManualResetEvent remains signaled until it is manually reset by calling the Reset() method. That is, calls to WaitOne return immediately.
An AutoResetEvent resets when the code passes through event. WaitOne() , but a ManualResetEvent does not.
Bookmark this question. Show activity on this post. lock (data) { PreprocessSmth(-10.0, 10.0); } //I want to notify next thread here that preprocess ended and now next thread can run //TODO: data can be calculated now SaveResultsToDatabase(); DoSomethingElseThatTakesTime();
When set/triggered, the ManualResetEvent stays in that state until manually reset (hence the “manual” part). As such, all of the threads waiting on that event will observe the event moving to the signalled state – and then the waiting threads will release and continue execution.
Represents a thread synchronization event that, when signaled, must be reset manually. This class cannot be inherited. public sealed class ManualResetEvent : System. Threading.
One of the big differences between a ManualResetEvent and an AutoResetEvent (WaitHandles) in the .NET Framework is that a ManualResetEvent can wake up multiple waiting threads, whilst an AutoResetEvent can only wake up one of the waiting threads.
The example creates three threads, each of which blocks on the ManualResetEvent by calling its WaitOne method. When the user presses the Enter key, the example calls the Set method, which releases all three threads.
You can use a Barrier object. It allows an unspecified number of Tasks to run, then wait for all others to reach that point.
And you can use it in a way similar to WaitGroup in Go if you do not know which tasks from which code blocks will start to work as a specific unit of work.
version 1
Maximum clarity: a new ManualResetEvent
is eagerly installed at the beginning of each PulseAll
cycle.
public class PulseEvent
{
public PulseEvent()
{
mre = new ManualResetEvent(false);
}
ManualResetEvent mre;
public void PulseAll() => Interlocked.Exchange(ref mre, new ManualResetEvent(false)).Set();
public bool Wait(int ms) => Volatile.Read(ref mre).WaitOne(ms);
public void Wait() => Wait(Timeout.Infinite);
};
version 2
This version avoids creating the internal event for any PulseAll
cycles that happen to complete without waiters. The first waiter(s), per cycle, enter an optimistic lock-free race to create and atomically install a single shared event.
public class PulseEvent
{
ManualResetEvent mre;
public void PulseAll() => Interlocked.Exchange(ref mre, null)?.Set();
public bool Wait(int ms)
{
ManualResetEvent tmp =
mre ??
Interlocked.CompareExchange(ref mre, tmp = new ManualResetEvent(false), null) ??
tmp;
return tmp.WaitOne(ms);
}
public void Wait() => Wait(Timeout.Infinite);
};
version 3
This version eliminates per-cycle allocations by allocating two persistent ManualResetEvent
objects and flipping between them.This slightly alters the semantics versus the above examples, as follows:
First, recycling the same two locks means that your PulseAll
cycles must be long enough to allow all of the waiters to clear the previous lock. Otherwise, when you call PulseAll
twice in quick succession, any waiting threads that were putatively released by the previous PulseAll
call--but which the OS hasn't had a chance to schedule yet--may end up getting re-blocked for the new cycle as well. I mention this mostly as a theoretical consideration, because it's a moot issue unless you block an extreme number of threads on sub-microsecond pulse cycles. You can decide whether this condition is relevant for your situation or not. If so, or if you're unsure or cautious, you can always use version 1 or version 2 above, which don't have this limitation.
Also "arguably" different (but see paragraph below for why this second point may be provably irrelevant) in this version, calls to PulseAll
that are deemed essentially simultaneous are merged, meaning all but one of those multiple "simultaneous" callers become NOPs. Such behavior is not without precedent (see "Remarks" here) and may be desirable, depending on the application.
Note that the latter point must be considered a legitimate design choice, as opposed to a bug, theoretical flaw or concurrency error. This is because Pulse locks are inherently ambiguous in situations of multiple simultaneous PulseAll
: specifically, there's no way to prove that any waiter who doesn't get released by the single, designated pulser would necessarily be released by one of the other merged/elided pulses either.
Saying it a different way, this type of lock isn't designed to atomically serialize the PulseAll
callers, and in fact it truly can't be, because it will always be possible for a skipped "simultaneous" pulse to independently come and go, even if entirely after the time of the merged pulse, and yet still "pulsing" before the arrival of the waiter (who wouldn't get pulsed).
public class PulseEvent
{
public PulseEvent()
{
cur = new ManualResetEvent(false);
alt = new ManualResetEvent(true);
}
ManualResetEvent cur, alt;
public void PulseAll()
{
ManualResetEvent tmp;
if ((tmp = Interlocked.Exchange(ref alt, null)) != null) // try claiming 'pulser'
{
tmp.Reset(); // prepare for re-use, ending previous cycle
(tmp = Interlocked.Exchange(ref cur, tmp)).Set(); // atomic swap & pulse
Volatile.Write(ref alt, tmp); // release claim; re-allow 'pulser' claims
}
}
public bool Wait(int ms) => cur.WaitOne(ms); // 'cur' is never null (unlike 'alt')
public void Wait() => Wait(Timeout.Infinite);
};
Finally, a couple general observations. An important recurring theme here and in this type of code generally is that the ManualResetEvent
must not be changed to the signalled state (i.e. by calling Set
) while it is still publicly visible. In the above code, we use Interlocked.Exchange
to atomically change the identity of the active lock in 'cur' (in this case, by instantaneously swapping in the alternate) and doing this before the Set
is crucial for guaranteeing that there can be no more new waiters added to that ManualResetEvent
, beyond those that were already blocked at the moment of swapping.
Only after this swap is it safe to release those waiting threads by calling Set
on our (now-)private copy. If we were to call Set
on the ManualResetEvent
while it was still published, it would be possible for a late-arriving waiter who had actually missed the instantenous pulse to nevertheless see the open lock and sail-through without waiting for the next one, as required by definition.
Interestingly, this means that even though it might intuitively feel like the exact moment that the "pulsing" occurs should coincide with Set
being called, in fact it is more correctly said to be right before that, at the moment of the Interlocked.Exchange
, because that's the action that strictly establishes the before/after cut-off time and seals the definitive set of waiters (if any) who are to be released.
So waiters who miss the cut-off and arrive immediately after must only be able to see--and will block on--the event now designated for the the next cycle, and this true even if the current cycle hasn't been signalled yet, nor any of its waiting threads released, all as required for correctness of "instantaneous" pulsing.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With