Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to make Win32/MFC threads loop in lockstep?

I'm new to multithreading in Windows, so this might be a trivial question: what's the easiest way of making sure that threads perform a loop in lockstep?

I tried passing a shared array of Events to all threads and using WaitForMultipleObjects at the end of the loop to synchronize them, but this gives me a deadlock after one, sometimes two, cycles. Here's a simplified version of my current code (with just two threads, but I'd like to make it scalable):

typedef struct
{
    int rank;
    HANDLE* step_events;
} IterationParams;

int main(int argc, char **argv)
{
    // ...

    IterationParams p[2];
    HANDLE step_events[2];
    for (int j=0; j<2; ++j)
    {
        step_events[j] = CreateEvent(NULL, FALSE, FALSE, NULL);
    }

    for (int j=0; j<2; ++j)
    {
        p[j].rank = j;
        p[j].step_events = step_events;
        AfxBeginThread(Iteration, p+j);
    }

    // ...
}

UINT Iteration(LPVOID pParam)
{
    IterationParams* p = (IterationParams*)pParam;
    int rank = p->rank;

    for (int i=0; i<100; i++)
    {
        if (rank == 0)
        {
            printf("%dth iteration\n",i);
            // do something
            SetEvent(p->step_events[0]);
            WaitForMultipleObjects(2, p->step_events, TRUE, INFINITE);
        }
        else if (rank == 1)
        {
            // do something else
            SetEvent(p->step_events[1]);
            WaitForMultipleObjects(2, p->step_events, TRUE, INFINITE);
        }
    }
    return 0;
}

(I know I'm mixing C and C++, it's actually legacy C code that I'm trying to parallelize.)

Reading the docs at MSDN, I think this should work. However, thread 0 only prints once, occasionally twice, and then the program hangs. Is this a correct way of synchronizing threads? If not, what would you recommend (is there really no built-in support for a barrier in MFC?).


EDIT: this solution is WRONG, even including Alessandro's fix. For example, consider this scenario:

  1. Thread 0 sets its event and calls Wait, blocks
  2. Thread 1 sets its event and calls Wait, blocks
  3. Thread 0 returns from Wait, resets its event, and completes a cycle without Thread 1 getting control
  4. Thread 0 sets its own event and calls Wait. Since Thread 1 had no chance to reset its event yet, Thread 0's Wait returns immediately and the threads go out of sync.

So the question remains: how does one safely ensure that the threads stay in lockstep?

like image 854
suszterpatt Avatar asked Feb 09 '11 14:02

suszterpatt


2 Answers

Introduction

I implemented a simple C++ program for your consideration (tested in Visual Studio 2010). It is using only Win32 APIs (and standard library for console output and a bit of randomization). You should be able to drop it into a new Win32 console project (without precompiled headers), compile and run.


Solution

#include <tchar.h>
#include <windows.h>


//---------------------------------------------------------
// Defines synchronization info structure. All threads will
// use the same instance of this struct to implement randezvous/
// barrier synchronization pattern.
struct SyncInfo
{
    SyncInfo(int threadsCount) : Awaiting(threadsCount), ThreadsCount(threadsCount), Semaphore(::CreateSemaphore(0, 0, 1024, 0)) {};
    ~SyncInfo() { ::CloseHandle(this->Semaphore); }
    volatile unsigned int Awaiting; // how many threads still have to complete their iteration
    const int ThreadsCount;
    const HANDLE Semaphore;
};


//---------------------------------------------------------
// Thread-specific parameters. Note that Sync is a reference
// (i.e. all threads share the same SyncInfo instance).
struct ThreadParams
{
    ThreadParams(SyncInfo &sync, int ordinal, int delay) : Sync(sync), Ordinal(ordinal), Delay(delay) {};
    SyncInfo &Sync;
    const int Ordinal;
    const int Delay;
};


//---------------------------------------------------------
// Called at the end of each itaration, it will "randezvous"
// (meet) all the threads before returning (so that next
// iteration can begin). In practical terms this function
// will block until all the other threads finish their iteration.
static void RandezvousOthers(SyncInfo &sync, int ordinal)
{
    if (0 == ::InterlockedDecrement(&(sync.Awaiting))) { // are we the last ones to arrive?
        // at this point, all the other threads are blocking on the semaphore
        // so we can manipulate shared structures without having to worry
        // about conflicts
        sync.Awaiting = sync.ThreadsCount;
        wprintf(L"Thread %d is the last to arrive, releasing synchronization barrier\n", ordinal);
        wprintf(L"---~~~---\n");

        // let's release the other threads from their slumber
        // by using the semaphore
        ::ReleaseSemaphore(sync.Semaphore, sync.ThreadsCount - 1, 0); // "ThreadsCount - 1" because this last thread will not block on semaphore
    }
    else { // nope, there are other threads still working on the iteration so let's wait
        wprintf(L"Thread %d is waiting on synchronization barrier\n", ordinal);
        ::WaitForSingleObject(sync.Semaphore, INFINITE); // note that return value should be validated at this point ;)
    }
}


//---------------------------------------------------------
// Define worker thread lifetime. It starts with retrieving
// thread-specific parameters, then loops through 5 iterations
// (randezvous-ing with other threads at the end of each),
// and then finishes (the thread can then be joined).
static DWORD WINAPI ThreadProc(void *p)
{
    ThreadParams *params = static_cast<ThreadParams *>(p);
    wprintf(L"Starting thread %d\n", params->Ordinal);

    for (int i = 1; i <= 5; ++i) {
        wprintf(L"Thread %d is executing iteration #%d (%d delay)\n", params->Ordinal, i, params->Delay);
        ::Sleep(params->Delay); 
        wprintf(L"Thread %d is synchronizing end of iteration #%d\n", params->Ordinal, i);
        RandezvousOthers(params->Sync, params->Ordinal);
    }

    wprintf(L"Finishing thread %d\n", params->Ordinal);
    return 0;
}


//---------------------------------------------------------
// Program to illustrate iteration-lockstep C++ solution.
int _tmain(int argc, _TCHAR* argv[])
{
    // prepare to run
    ::srand(::GetTickCount()); // pseudo-randomize random values :-)
    SyncInfo sync(4);
    ThreadParams p[] = {
        ThreadParams(sync, 1, ::rand() * 900 / RAND_MAX + 100), // a delay between 200 and 1000 milliseconds will simulate work that an iteration would do
        ThreadParams(sync, 2, ::rand() * 900 / RAND_MAX + 100),
        ThreadParams(sync, 3, ::rand() * 900 / RAND_MAX + 100),
        ThreadParams(sync, 4, ::rand() * 900 / RAND_MAX + 100),
    };

    // let the threads rip
    HANDLE t[] = {
        ::CreateThread(0, 0, ThreadProc, p + 0, 0, 0),
        ::CreateThread(0, 0, ThreadProc, p + 1, 0, 0),
        ::CreateThread(0, 0, ThreadProc, p + 2, 0, 0),
        ::CreateThread(0, 0, ThreadProc, p + 3, 0, 0),
    };

    // wait for the threads to finish (join)
    ::WaitForMultipleObjects(4, t, true, INFINITE);

    return 0;
}

Sample Output

Running this program on my machine (dual-core) yields the following output:

Starting thread 1
Starting thread 2
Starting thread 4
Thread 1 is executing iteration #1 (712 delay)
Starting thread 3
Thread 2 is executing iteration #1 (798 delay)
Thread 4 is executing iteration #1 (477 delay)
Thread 3 is executing iteration #1 (104 delay)
Thread 3 is synchronizing end of iteration #1
Thread 3 is waiting on synchronization barrier
Thread 4 is synchronizing end of iteration #1
Thread 4 is waiting on synchronization barrier
Thread 1 is synchronizing end of iteration #1
Thread 1 is waiting on synchronization barrier
Thread 2 is synchronizing end of iteration #1
Thread 2 is the last to arrive, releasing synchronization barrier
---~~~---
Thread 2 is executing iteration #2 (798 delay)
Thread 3 is executing iteration #2 (104 delay)
Thread 1 is executing iteration #2 (712 delay)
Thread 4 is executing iteration #2 (477 delay)
Thread 3 is synchronizing end of iteration #2
Thread 3 is waiting on synchronization barrier
Thread 4 is synchronizing end of iteration #2
Thread 4 is waiting on synchronization barrier
Thread 1 is synchronizing end of iteration #2
Thread 1 is waiting on synchronization barrier
Thread 2 is synchronizing end of iteration #2
Thread 2 is the last to arrive, releasing synchronization barrier
---~~~---
Thread 4 is executing iteration #3 (477 delay)
Thread 3 is executing iteration #3 (104 delay)
Thread 1 is executing iteration #3 (712 delay)
Thread 2 is executing iteration #3 (798 delay)
Thread 3 is synchronizing end of iteration #3
Thread 3 is waiting on synchronization barrier
Thread 4 is synchronizing end of iteration #3
Thread 4 is waiting on synchronization barrier
Thread 1 is synchronizing end of iteration #3
Thread 1 is waiting on synchronization barrier
Thread 2 is synchronizing end of iteration #3
Thread 2 is the last to arrive, releasing synchronization barrier
---~~~---
Thread 2 is executing iteration #4 (798 delay)
Thread 3 is executing iteration #4 (104 delay)
Thread 1 is executing iteration #4 (712 delay)
Thread 4 is executing iteration #4 (477 delay)
Thread 3 is synchronizing end of iteration #4
Thread 3 is waiting on synchronization barrier
Thread 4 is synchronizing end of iteration #4
Thread 4 is waiting on synchronization barrier
Thread 1 is synchronizing end of iteration #4
Thread 1 is waiting on synchronization barrier
Thread 2 is synchronizing end of iteration #4
Thread 2 is the last to arrive, releasing synchronization barrier
---~~~---
Thread 3 is executing iteration #5 (104 delay)
Thread 4 is executing iteration #5 (477 delay)
Thread 1 is executing iteration #5 (712 delay)
Thread 2 is executing iteration #5 (798 delay)
Thread 3 is synchronizing end of iteration #5
Thread 3 is waiting on synchronization barrier
Thread 4 is synchronizing end of iteration #5
Thread 4 is waiting on synchronization barrier
Thread 1 is synchronizing end of iteration #5
Thread 1 is waiting on synchronization barrier
Thread 2 is synchronizing end of iteration #5
Thread 2 is the last to arrive, releasing synchronization barrier
---~~~---
Finishing thread 4
Finishing thread 3
Finishing thread 2
Finishing thread 1

Note that for simplicity each thread has random duration of iteration, but all iterations of that thread will use that same random duration (i.e. it doesn't change between iterations).


How does it work?

The "core" of the solution is in the "RandezvousOthers" function. This function will either block on a shared semaphore (if the thread on which this function was called was not the last one to call the function) or reset Sync structure and unblock all the threads blocking on a shared semaphore (if the thread on which this function was called was the last one to call the function).

like image 187
Milan Gardian Avatar answered Sep 27 '22 20:09

Milan Gardian


To have it work, set the second parameter of CreateEvent to TRUE. This will make the events "manual reset" and prevents the Waitxxx to reset it. Then place a ResetEvent at the beginning of the loop.

like image 27
Loghorn Avatar answered Sep 27 '22 19:09

Loghorn