Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Cross Process Event - Release all waiters reliably

I have created a cross process event via ManualResetEvent. When this event does occur potentially n threads in n different processes should be unblocked and start running to fetch the new data. The problem is that it seems that ManualResetEvent.Set followed by an immediate Reset does not cause all waiting threads to wake up. The docs are pretty vague there

http://msdn.microsoft.com/en-us/library/windows/desktop/ms682396(v=vs.85).aspx

When the state of a manual-reset event object is signaled, it remains signaled until it is explicitly reset to nonsignaled by the ResetEvent function. Any number of waiting threads, or threads that subsequently begin wait operations for the specified event object, can be released while the object's state is signaled.

There is a method called PulseEvent which seems to do exactly what I need but unfortunately it is also flawed.

A thread waiting on a synchronization object can be momentarily removed from the wait state by a kernel-mode APC, and then returned to the wait state after the APC is complete. If the call to PulseEvent occurs during the time when the thread has been removed from the wait state, the thread will not be released because PulseEvent releases only those threads that are waiting at the moment it is called. Therefore, PulseEvent is unreliable and should not be used by new applications. Instead, use condition variables.

Now MS does recommend to use condition variables.

Condition variables are synchronization primitives that enable threads to wait until a particular condition occurs. Condition variables are user-mode objects that cannot be shared across processes.

Following the docs I seem to have run out of luck to do it reliably. Is there an easy way to accomplish the same thing without the stated limitations with one ManualResetEvent or do I need to create for each listener process a response event to get an ACK for each subscribed caller? In that case I would need a small shared memory to register the pids of the subscribed processes but that seems to bring in its own set of problems. What does happen when one process crashes or does not respond? ....

To give some context. I have new state to publish which all other processes should read from a shared memory location. It is ok to miss one update when several updates occur at once but the process must read at least the last up to date value. I could poll with a timeout but that seems not like a correct solution.

Currently I am down to

ChangeEvent = new EventWaitHandle(false, EventResetMode.ManualReset, counterName + "_Event");

ChangeEvent.Set();
Thread.Sleep(1); // increase odds to release all waiters
ChangeEvent.Reset();
like image 879
Alois Kraus Avatar asked May 06 '13 06:05

Alois Kraus


2 Answers

One general purpose option for handling the case where producers must wake all consumers and the number of consumers is evolving is to use a moving fence approach. This option requires a shared memory IPC region too. The method does sometimes result in consumers being woken when no work is present, especially if lots of processes need scheduling and load is high, but they will always wake except on hopelessly overloaded machines.

Create several manual reset events and have the producers maintain a counter to the next event that will be set. All Events are left set, except the NextToFire event. Consumer processes wait on the NextToFire event. When the producer wishes to wake all consumers it resets the Next+1 event and sets the current event. All consumers will eventually be scheduled and then wait on the new NextToFire event. The effect is that only the producer uses ResetEvent, but consumers always know which event will be next to wake them.

All Users Init: (pseudo code is C/C++, not C#)

// Create Shared Memory and initialise NextToFire;
pSharedMemory = MapMySharedMemory();
if (First to create memory) pSharedMemory->NextToFire = 0;

HANDLE Array[4];
Array[0] = CreateEvent(NULL, 1, 0, "Event1");
Array[1] = CreateEvent(NULL, 1, 0, "Event2");
Array[2] = CreateEvent(NULL, 1, 0, "Event3");
Array[3] = CreateEvent(NULL, 1, 0, "Event4");

Producer to Wake all

long CurrentNdx = pSharedMemory->NextToFire;
long NextNdx = (CurrentNdx+1) & 3;

// Reset next event so consumers block
ResetEvent(Array[NextNdx]);

// Flag to consumers new value
long Actual = InterlockedIncrement(&pSharedMemory->NextToFire) & 3;

// Next line needed if multiple producers active.
// Not a perfect solution
if (Actual != NextNdx) ResetEvent(Actual);

// Now wake them all up
SetEvent(CurrentNdx);

Consumers wait logic

long CurrentNdx = (pSharedMemory->NextToFire) & 3;
WaitForSingleObject(Array[CurrentNdx],  Timeout);
like image 73
rlb Avatar answered Oct 06 '22 11:10

rlb


Since .NET 4.0, you could use MemoryMappedFile to sync process memory. In this case, write counter to MemoryMappedFile and decrement it from worker processes. If the counter equals to zero, then main process allowed to reset event. Here is the sample code.

Main Process

//number of WorkerProcess
int numWorkerProcess = 5;

//Create MemroyMappedFile object and accessor. 4 means int size.
MemoryMappedFile mmf = MemoryMappedFile.CreateNew("test_mmf", 4);
MemoryMappedViewAccessor accessor = mmf.CreateViewAccessor();

EventWaitHandle ChangeEvent = new EventWaitHandle(false, EventResetMode.ManualReset, counterName + "_Event");

//write counter to MemoryMappedFile
accessor.Write(0, numWorkerProcess);

//.....

ChangeEvent.Set();

//spin wait until all workerProcesses decreament counter
SpinWait.SpinUntil(() => {

    int numLeft = accessor.ReadInt32(0);
    return (numLeft == 0);
});


ChangeEvent.Reset();

WorkerProcess

//Create existed MemoryMappedfile object which created by main process.
MemoryMappedFile mmf = MemoryMappedFile.OpenExisting("test_mmf");
MemoryMappedViewAccessor accessor = mmf.CreateViewAccessor();

//This mutex object is used for decreament counter.
Mutex mutex = new Mutex(false, "test_mutex");
EventWaitHandle ChangeEvent = new EventWaitHandle(false, EventResetMode.ManualReset, "start_Event");

//....

ChangeEvent.WaitOne();

//some job...

//decrement counter with mutex lock. 
mutex.WaitOne();
int count = accessor.ReadInt32(0);
--count;
accessor.Write(0, count);
mutex.ReleaseMutex();
/////////////////////////////////////

If environment is less than .NET 4.0, you could realize by using CreateFileMapping function from win32 API.

like image 2
Darksanta Avatar answered Oct 06 '22 11:10

Darksanta