I'm new to multithreading in Windows, so this might be a trivial question: what's the easiest way of making sure that threads perform a loop in lockstep?
I tried passing a shared array of Event
s to all threads and using WaitForMultipleObjects
at the end of the loop to synchronize them, but this gives me a deadlock after one, sometimes two, cycles. Here's a simplified version of my current code (with just two threads, but I'd like to make it scalable):
typedef struct
{
int rank;
HANDLE* step_events;
} IterationParams;
int main(int argc, char **argv)
{
// ...
IterationParams p[2];
HANDLE step_events[2];
for (int j=0; j<2; ++j)
{
step_events[j] = CreateEvent(NULL, FALSE, FALSE, NULL);
}
for (int j=0; j<2; ++j)
{
p[j].rank = j;
p[j].step_events = step_events;
AfxBeginThread(Iteration, p+j);
}
// ...
}
UINT Iteration(LPVOID pParam)
{
IterationParams* p = (IterationParams*)pParam;
int rank = p->rank;
for (int i=0; i<100; i++)
{
if (rank == 0)
{
printf("%dth iteration\n",i);
// do something
SetEvent(p->step_events[0]);
WaitForMultipleObjects(2, p->step_events, TRUE, INFINITE);
}
else if (rank == 1)
{
// do something else
SetEvent(p->step_events[1]);
WaitForMultipleObjects(2, p->step_events, TRUE, INFINITE);
}
}
return 0;
}
(I know I'm mixing C and C++, it's actually legacy C code that I'm trying to parallelize.)
Reading the docs at MSDN, I think this should work. However, thread 0 only prints once, occasionally twice, and then the program hangs. Is this a correct way of synchronizing threads? If not, what would you recommend (is there really no built-in support for a barrier in MFC?).
EDIT: this solution is WRONG, even including Alessandro's fix. For example, consider this scenario:
So the question remains: how does one safely ensure that the threads stay in lockstep?
I implemented a simple C++ program for your consideration (tested in Visual Studio 2010). It is using only Win32 APIs (and standard library for console output and a bit of randomization). You should be able to drop it into a new Win32 console project (without precompiled headers), compile and run.
#include <tchar.h>
#include <windows.h>
//---------------------------------------------------------
// Defines synchronization info structure. All threads will
// use the same instance of this struct to implement randezvous/
// barrier synchronization pattern.
struct SyncInfo
{
SyncInfo(int threadsCount) : Awaiting(threadsCount), ThreadsCount(threadsCount), Semaphore(::CreateSemaphore(0, 0, 1024, 0)) {};
~SyncInfo() { ::CloseHandle(this->Semaphore); }
volatile unsigned int Awaiting; // how many threads still have to complete their iteration
const int ThreadsCount;
const HANDLE Semaphore;
};
//---------------------------------------------------------
// Thread-specific parameters. Note that Sync is a reference
// (i.e. all threads share the same SyncInfo instance).
struct ThreadParams
{
ThreadParams(SyncInfo &sync, int ordinal, int delay) : Sync(sync), Ordinal(ordinal), Delay(delay) {};
SyncInfo &Sync;
const int Ordinal;
const int Delay;
};
//---------------------------------------------------------
// Called at the end of each itaration, it will "randezvous"
// (meet) all the threads before returning (so that next
// iteration can begin). In practical terms this function
// will block until all the other threads finish their iteration.
static void RandezvousOthers(SyncInfo &sync, int ordinal)
{
if (0 == ::InterlockedDecrement(&(sync.Awaiting))) { // are we the last ones to arrive?
// at this point, all the other threads are blocking on the semaphore
// so we can manipulate shared structures without having to worry
// about conflicts
sync.Awaiting = sync.ThreadsCount;
wprintf(L"Thread %d is the last to arrive, releasing synchronization barrier\n", ordinal);
wprintf(L"---~~~---\n");
// let's release the other threads from their slumber
// by using the semaphore
::ReleaseSemaphore(sync.Semaphore, sync.ThreadsCount - 1, 0); // "ThreadsCount - 1" because this last thread will not block on semaphore
}
else { // nope, there are other threads still working on the iteration so let's wait
wprintf(L"Thread %d is waiting on synchronization barrier\n", ordinal);
::WaitForSingleObject(sync.Semaphore, INFINITE); // note that return value should be validated at this point ;)
}
}
//---------------------------------------------------------
// Define worker thread lifetime. It starts with retrieving
// thread-specific parameters, then loops through 5 iterations
// (randezvous-ing with other threads at the end of each),
// and then finishes (the thread can then be joined).
static DWORD WINAPI ThreadProc(void *p)
{
ThreadParams *params = static_cast<ThreadParams *>(p);
wprintf(L"Starting thread %d\n", params->Ordinal);
for (int i = 1; i <= 5; ++i) {
wprintf(L"Thread %d is executing iteration #%d (%d delay)\n", params->Ordinal, i, params->Delay);
::Sleep(params->Delay);
wprintf(L"Thread %d is synchronizing end of iteration #%d\n", params->Ordinal, i);
RandezvousOthers(params->Sync, params->Ordinal);
}
wprintf(L"Finishing thread %d\n", params->Ordinal);
return 0;
}
//---------------------------------------------------------
// Program to illustrate iteration-lockstep C++ solution.
int _tmain(int argc, _TCHAR* argv[])
{
// prepare to run
::srand(::GetTickCount()); // pseudo-randomize random values :-)
SyncInfo sync(4);
ThreadParams p[] = {
ThreadParams(sync, 1, ::rand() * 900 / RAND_MAX + 100), // a delay between 200 and 1000 milliseconds will simulate work that an iteration would do
ThreadParams(sync, 2, ::rand() * 900 / RAND_MAX + 100),
ThreadParams(sync, 3, ::rand() * 900 / RAND_MAX + 100),
ThreadParams(sync, 4, ::rand() * 900 / RAND_MAX + 100),
};
// let the threads rip
HANDLE t[] = {
::CreateThread(0, 0, ThreadProc, p + 0, 0, 0),
::CreateThread(0, 0, ThreadProc, p + 1, 0, 0),
::CreateThread(0, 0, ThreadProc, p + 2, 0, 0),
::CreateThread(0, 0, ThreadProc, p + 3, 0, 0),
};
// wait for the threads to finish (join)
::WaitForMultipleObjects(4, t, true, INFINITE);
return 0;
}
Running this program on my machine (dual-core) yields the following output:
Starting thread 1
Starting thread 2
Starting thread 4
Thread 1 is executing iteration #1 (712 delay)
Starting thread 3
Thread 2 is executing iteration #1 (798 delay)
Thread 4 is executing iteration #1 (477 delay)
Thread 3 is executing iteration #1 (104 delay)
Thread 3 is synchronizing end of iteration #1
Thread 3 is waiting on synchronization barrier
Thread 4 is synchronizing end of iteration #1
Thread 4 is waiting on synchronization barrier
Thread 1 is synchronizing end of iteration #1
Thread 1 is waiting on synchronization barrier
Thread 2 is synchronizing end of iteration #1
Thread 2 is the last to arrive, releasing synchronization barrier
---~~~---
Thread 2 is executing iteration #2 (798 delay)
Thread 3 is executing iteration #2 (104 delay)
Thread 1 is executing iteration #2 (712 delay)
Thread 4 is executing iteration #2 (477 delay)
Thread 3 is synchronizing end of iteration #2
Thread 3 is waiting on synchronization barrier
Thread 4 is synchronizing end of iteration #2
Thread 4 is waiting on synchronization barrier
Thread 1 is synchronizing end of iteration #2
Thread 1 is waiting on synchronization barrier
Thread 2 is synchronizing end of iteration #2
Thread 2 is the last to arrive, releasing synchronization barrier
---~~~---
Thread 4 is executing iteration #3 (477 delay)
Thread 3 is executing iteration #3 (104 delay)
Thread 1 is executing iteration #3 (712 delay)
Thread 2 is executing iteration #3 (798 delay)
Thread 3 is synchronizing end of iteration #3
Thread 3 is waiting on synchronization barrier
Thread 4 is synchronizing end of iteration #3
Thread 4 is waiting on synchronization barrier
Thread 1 is synchronizing end of iteration #3
Thread 1 is waiting on synchronization barrier
Thread 2 is synchronizing end of iteration #3
Thread 2 is the last to arrive, releasing synchronization barrier
---~~~---
Thread 2 is executing iteration #4 (798 delay)
Thread 3 is executing iteration #4 (104 delay)
Thread 1 is executing iteration #4 (712 delay)
Thread 4 is executing iteration #4 (477 delay)
Thread 3 is synchronizing end of iteration #4
Thread 3 is waiting on synchronization barrier
Thread 4 is synchronizing end of iteration #4
Thread 4 is waiting on synchronization barrier
Thread 1 is synchronizing end of iteration #4
Thread 1 is waiting on synchronization barrier
Thread 2 is synchronizing end of iteration #4
Thread 2 is the last to arrive, releasing synchronization barrier
---~~~---
Thread 3 is executing iteration #5 (104 delay)
Thread 4 is executing iteration #5 (477 delay)
Thread 1 is executing iteration #5 (712 delay)
Thread 2 is executing iteration #5 (798 delay)
Thread 3 is synchronizing end of iteration #5
Thread 3 is waiting on synchronization barrier
Thread 4 is synchronizing end of iteration #5
Thread 4 is waiting on synchronization barrier
Thread 1 is synchronizing end of iteration #5
Thread 1 is waiting on synchronization barrier
Thread 2 is synchronizing end of iteration #5
Thread 2 is the last to arrive, releasing synchronization barrier
---~~~---
Finishing thread 4
Finishing thread 3
Finishing thread 2
Finishing thread 1
Note that for simplicity each thread has random duration of iteration, but all iterations of that thread will use that same random duration (i.e. it doesn't change between iterations).
The "core" of the solution is in the "RandezvousOthers" function. This function will either block on a shared semaphore (if the thread on which this function was called was not the last one to call the function) or reset Sync structure and unblock all the threads blocking on a shared semaphore (if the thread on which this function was called was the last one to call the function).
To have it work, set the second parameter of CreateEvent
to TRUE
. This will make the events "manual reset" and prevents the Waitxxx
to reset it.
Then place a ResetEvent
at the beginning of the loop.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With