Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pthread broadcast and then wait?

I'm trying to set up several threads to sit in a wait state until they receive a pthread_cond_broadcast().

After completing a job, I want the threads to go back into their wait states.

I also want the process that called the pthread_cond_broadcast() to wait for all of the threads to return to their wait states before continuing. In this case, it's the main() function that calls broadcast. I'm trying to do this b having main(0 do a pthread_cond_wait() after calling a broadcast.

void* Work::job(void* id)
{
    int idx = (long)id;

    while(1)
    {
        pthread_mutex_lock(&job_lock);

        while(!jobs_complete)
        {
            // wait for main to broadcast
            pthread_cond_wait(&can_work, &job_lock);
            pthread_mutex_unlock(&job_lock);

            // work here

            pthread_mutex_lock(&job_lock);
            ++jobs_completed;

            if(jobs_completed == NUM_THREADS)
            {
                jobs_complete = true;
                pthread_cond_signal(&jobs_done);
                pthread_mutex_unlock(&job_lock);
            }
            pthread_mutex_unlock(&job_lock);
        }

        pthread_mutex_unlock(&job_lock);
    }

    return NULL;
}

NUM_THREADS is 4, job_lock is a pthread_mutex_t, can_work and jobs_done are pthread_cond_t, jobs_completed is a bool and jobs_complete is an int.

// work

jobs_completed = false;
jobs_complete = 0;
pthread_mutex_lock(&job_lock);
pthread_cond_broadcast(&can_work);
pthread_cond_wait(&jobs_complete);
pthread_mutex_unlock(&job_lock);

// work that depends on jobs_complete

Right now, I'm doing this right now by calling pthread_cond_broadcast() and then pthread_cond_wait() right after it but this seems to deadlock.

Can anyone explain how I should be doing this or where I went wrong? I'd appreciate any assistance.

Thanks!

like image 482
noko Avatar asked Mar 28 '13 07:03

noko


People also ask

What does Pthread cond broadcast do?

The pthread_cond_signal() function wakes up at least one thread that is currently waiting on the condition variable specified by cond. If no threads are currently blocked on the condition variable, this call has no effect.

Does Pthread cond wait unlock mutex?

The pthread_cond_wait() function atomically unlocks mutex and performs the wait for the condition. In this case, atomically means with respect to the mutex and the condition variable and another threads access to those objects through the pthread condition variable interfaces.

Does pthread_cond_signal release mutex?

The pthread_cond_signal() routine is used to signal (or wake up) another thread which is waiting on the condition variable. It should be called after mutex is locked, and must unlock mutex in order for pthread_cond_wait() routine to complete.

What does pthread_cond_wait return?

The pthread_cond_wait() routine always returns with the mutex locked and owned by the calling thread, even when returning an error. This function blocks until the condition is signaled. The function atomically releases the associated mutex lock before blocking, and atomically acquires the mutex again before returning.


2 Answers

I'm only posting this (which is almost all C code, but so is pthreads, so a little slack is kindly requested) to demonstrate one way of doing what I think you're trying to accomplish. Obviously you would want to properly encapsulate most of this in proper classes etc. What this is hopefully going to show you is how condition variables, mutex, and their relationship with predicate management and notification works.

I hope you find it useful. Have a great day.

#include <iostream>
#include <unistd.h>
#include <pthread.h>
using namespace std;

// our global condition variable and mutex
pthread_cond_t cv = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;

// our predicate values.
bool finished = false;
int jobs_waiting = 0;
int jobs_completed = 0;

// our thread proc
static void *worker_proc(void* p)
{
    intptr_t id = (intptr_t)p;  // our id
    size_t n_completed = 0;     // our job completion count

    // always latch prior to eval'ing predicate vars.
    pthread_mutex_lock(&mtx);
    while (!finished)
    {
        // wait for finish or work-waiting predicate
        while (!finished && jobs_waiting == 0)
            pthread_cond_wait(&cv, &mtx);

        // we own the mutex, so we're free to look at, modify
        //  etc. the values(s) that we're using for our predicate
        if (finished)
            break;

        // must be a job_waiting, reduce that number by one, then
        //  unlock the mutex and start our work. Note that we're
        //  changing the predicate (jobs_waiting is part of it) and
        //  we therefore need to let anyone that is monitoring know.
        --jobs_waiting;
        pthread_cond_broadcast(&cv);
        pthread_mutex_unlock(&mtx);

        // DO WORK HERE (this just runs a lame summation)
        for (int i=0,x=0;i<1048576; x += ++i);
        ++n_completed;

        // finished work latch mutex and setup changes
        pthread_mutex_lock(&mtx);
        ++jobs_completed;
        pthread_cond_broadcast(&cv);
    }

    // final report
    cout << id << ": jobs completed = " << n_completed << endl;

    // we always exit owning the mutex, so unlock it now. but
    //  let anyone else know they should be quitting as well.
    pthread_cond_broadcast(&cv);
    pthread_mutex_unlock(&mtx);
    return p;
}

// sets up a batch of work and waits for it to finish.
void run_batch(int num)
{
    pthread_mutex_lock(&mtx);
    jobs_waiting = num;
    jobs_completed = 0;
    pthread_cond_broadcast(&cv);

    // wait or all jobs to complete.
    while (jobs_completed != num)
        pthread_cond_wait(&cv, &mtx);

    // we own this coming out, so let it go.
    pthread_mutex_unlock(&mtx);
}

// main entry point.
int main()
{
    // number of threads in our crew
    static const size_t N = 7;
    pthread_t thrds[N] = {0};

    // startup thread crew.
    intptr_t id = 0;
    for (size_t i=0; i<N; ++i)
        pthread_create(thrds + i, NULL, worker_proc, (void*)(++id));

    // run through batches. each batch is one larger
    //  than the prior batch. this should result in some
    //  interesting job-counts per-thread.
    for (int i=0; i<64; ++i)
        run_batch(i);

    // flag for shutdown state.
    pthread_mutex_lock(&mtx);
    finished = true;
    pthread_cond_broadcast(&cv);
    pthread_mutex_unlock(&mtx);
    for (size_t i=0; i<N; pthread_join(thrds[i++], NULL));

    return 0;
}

Sample Output #1

3: jobs completed = 256
6: jobs completed = 282
5: jobs completed = 292
2: jobs completed = 242
1: jobs completed = 339
4: jobs completed = 260
7: jobs completed = 409

Sample Output #2

6: jobs completed = 882
1: jobs completed = 210
4: jobs completed = 179
5: jobs completed = 178
2: jobs completed = 187
7: jobs completed = 186
3: jobs completed = 194

Sample Output #3

1: jobs completed = 268
6: jobs completed = 559
3: jobs completed = 279
5: jobs completed = 270
2: jobs completed = 164
4: jobs completed = 317
7: jobs completed = 159

Fixed Batch Sizes

The same code, but changing this:

for (int i=0; i<64; ++i)
    run_batch(i);

to this:

for (int i=0; i<64; ++i)
    run_batch(N);

gives the following, which is probably even closer to what you're really looking for.

Sample Output #1

4: jobs completed = 65
2: jobs completed = 63
5: jobs completed = 66
3: jobs completed = 63
1: jobs completed = 64
7: jobs completed = 63
6: jobs completed = 64

Sample Output #2

3: jobs completed = 65
5: jobs completed = 62
1: jobs completed = 67
7: jobs completed = 63
2: jobs completed = 65
6: jobs completed = 61
4: jobs completed = 65

Sample Output #3

2: jobs completed = 58
4: jobs completed = 61
5: jobs completed = 69
7: jobs completed = 68
3: jobs completed = 61
1: jobs completed = 64
6: jobs completed = 67
like image 84
WhozCraig Avatar answered Oct 18 '22 04:10

WhozCraig


You have 3 possible successive calls to pthread_mutex_unlock at the end of your function, which would result in undefined behaviour. You actually don't need the two inner ones. If jobs_complete is true, the thread will exit the loop and release the lock, otherwise it will loop and need it for the wait on the can_work condition.

Also, there

 pthread_cond_wait(&jobs_complete);

you probably mean:

pthread_cond_wait(&jobs_complete,&job_lock);

Besides, that function expects a pthread_cond_t * and a pthread_mutex_t *, not an int, so even then that code is clearly broken.

Be aware that a signal or a broadcast on a condition variable will only have an effect on threads already waiting on the variable. The signal is not retained for future wait. So when the threads loop on jobs_complete while block and wait again, they will have to be signaled again to resume work.

Another thing: you mention the types of job_complete as int and job_completed as bool, yet your code doesn't seem to agree:

        if(jobs_completed == NUM_THREADS)
        {
            jobs_complete = true;

Here's my advice: learn on the semaphore and barrier abstract models, and if you can, use the existing implementations (boost or std in C++11), or reimplement them using the pthread API otherwise. Those will help you handle the situation much more easily than manipulating cond variables. Look on this very website for existing solutions. For example this question deals with a very similar problem, and the solution I provided can be easily modified to use the pthread API to match your requirement.

like image 40
didierc Avatar answered Oct 18 '22 03:10

didierc