Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to synchronize manager/worker pthreads without a join?

Tags:

c

posix

pthreads

I'm familiar with multithreading and I've developed many multithreaded programs in Java and Objective-C successfully. But I couldn't achieve the following in C using pthreads without using a join from the main thread:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define NUM_OF_THREADS 2

struct thread_data {
    int start;
    int end;
    int *arr;
};

void print(int *ints, int n);
void *processArray(void *args);

int main(int argc, const char * argv[])
{
    int numOfInts = 10;
    int *ints = malloc(numOfInts * sizeof(int));
    for (int i = 0; i < numOfInts; i++) {
        ints[i] = i;
    }
    print(ints, numOfInts); // prints [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

    pthread_t threads[NUM_OF_THREADS];
    struct thread_data thread_data[NUM_OF_THREADS];

    // these vars are used to calculate the index ranges for each thread
    int remainingWork = numOfInts, amountOfWork;
    int startRange, endRange = -1;

    for (int i = 0; i < NUM_OF_THREADS; i++) {

        amountOfWork = remainingWork / (NUM_OF_THREADS - i);
        startRange = endRange + 1;
        endRange   = startRange + amountOfWork - 1;

        thread_data[i].arr   = ints;
        thread_data[i].start = startRange;
        thread_data[i].end   = endRange;

        pthread_create(&threads[i], NULL, processArray, (void *)&thread_data[i]);

        remainingWork -= amountOfWork;      
    }

    // 1. Signal to the threads to start working


    // 2. Wait for them to finish


    print(ints, numOfInts); // should print [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

    free(ints);
    return 0;
}

void *processArray(void *args)
{
    struct thread_data *data = (struct thread_data *)args;
    int *arr  = data->arr;
    int start = data->start;
    int end   = data->end;

    // 1. Wait for a signal to start from the main thread


    for (int i = start; i <= end; i++) {
        arr[i] = arr[i] + 1;
    }

    // 2. Signal to the main thread that you're done

    pthread_exit(NULL);
}

void print(int *ints, int n)
{
    printf("[");
    for (int i = 0; i < n; i++) {
        printf("%d", ints[i]);
        if (i+1 != n)
            printf(", ");
    }
    printf("]\n");
}

I would like to achieve the following in the above code:

In main():

  1. Signal to the threads to start working.
  2. Wait for the background threads to finish.

In processArray():

  1. Wait for a signal to start from the main thread
  2. Signal to the main thread that you're done

I don't want to use a join in the main thread because in the real application, the main thread will create the threads once, and then it will signal to the background threads to work many times, and I can't let the main thread proceed unless all the background threads have finished working. In the processArray function, I will put an infinite loop as following:

void *processArray(void *args)
{
    struct thread_data *data = (struct thread_data *)args;

    while (1)
    {
      // 1. Wait for a signal to start from the main thread

      int *arr  = data->arr;
      int start = data->start;
      int end   = data->end;          

      // Process
      for (int i = start; i <= end; i++) {
          arr[i] = arr[i] + 1;
      }

      // 2. Signal to the main thread that you're done

    }

    pthread_exit(NULL);
}

Note that I'm new to C and the posix API, so excuse me if I'm missing something obvious. But I really tried many things, starting from using a mutex, and an array of semaphores, and a mixture of both, but without success. I think a condition variable may help, but I couldn't understand how it could be used.

Thanks for your time.

Problem Solved:

Thank you guys so much! I was finally able to get this to work safely and without using a join by following your tips. Although the solution is somewhat ugly, it gets the job done and the performance gains is worth it (as you'll see below). For anyone interested, this is a simulation of the real application I'm working on, in which the main thread keeps giving work continuously to the background threads:

 #include <stdio.h>
 #include <stdlib.h>
 #include <pthread.h>

 #define NUM_OF_THREADS 5

 struct thread_data {
     int id;
     int start;
     int end;
     int *arr;
 };

 pthread_mutex_t currentlyIdleMutex = PTHREAD_MUTEX_INITIALIZER;
 pthread_cond_t  currentlyIdleCond  = PTHREAD_COND_INITIALIZER;
 int currentlyIdle;

 pthread_mutex_t workReadyMutex = PTHREAD_MUTEX_INITIALIZER;
 pthread_cond_t  workReadyCond  = PTHREAD_COND_INITIALIZER;
 int workReady;

 pthread_cond_t  currentlyWorkingCond = PTHREAD_COND_INITIALIZER;
 pthread_mutex_t currentlyWorkingMutex= PTHREAD_MUTEX_INITIALIZER;
 int currentlyWorking;

 pthread_mutex_t canFinishMutex = PTHREAD_MUTEX_INITIALIZER;
 pthread_cond_t  canFinishCond  = PTHREAD_COND_INITIALIZER;
 int canFinish;

 void print(int *ints, int n);
 void *processArray(void *args);
 int validateResult(int *ints, int num, int start);

 int main(int argc, const char * argv[])
 {
     int numOfInts = 10;
     int *ints = malloc(numOfInts * sizeof(int));
     for (int i = 0; i < numOfInts; i++) {
         ints[i] = i;
     }
 //   print(ints, numOfInts);

     pthread_t threads[NUM_OF_THREADS];
     struct thread_data thread_data[NUM_OF_THREADS];
     workReady = 0;
     canFinish = 0;
     currentlyIdle = 0;
     currentlyWorking = 0;

     // these vars are used to calculate the index ranges for each thread
     int remainingWork = numOfInts, amountOfWork;
     int startRange, endRange = -1;
     // Create the threads and give each one its data struct.
     for (int i = 0; i < NUM_OF_THREADS; i++) {

         amountOfWork = remainingWork / (NUM_OF_THREADS - i);
         startRange = endRange + 1;
         endRange   = startRange + amountOfWork - 1;

         thread_data[i].id    = i;
         thread_data[i].arr   = ints;
         thread_data[i].start = startRange;
         thread_data[i].end   = endRange;

         pthread_create(&threads[i], NULL, processArray, (void *)&thread_data[i]);
         remainingWork -= amountOfWork;
     }

     int loops = 1111111;
     int expectedStartingValue = ints[0] + loops; // used to validate the results
     // The elements in ints[] should be incremented by 1 in each loop
     while (loops-- != 0) {

         // Make sure all of them are ready
         pthread_mutex_lock(&currentlyIdleMutex);
         while (currentlyIdle != NUM_OF_THREADS) {
             pthread_cond_wait(&currentlyIdleCond, &currentlyIdleMutex);
         }
         pthread_mutex_unlock(&currentlyIdleMutex);

         // All threads are now blocked; it's safe to not lock the mutex.
         // Prevent them from finishing before authorized.
         canFinish = 0;
         // Reset the number of currentlyWorking threads
         currentlyWorking = NUM_OF_THREADS;

         // Signal to the threads to start
         pthread_mutex_lock(&workReadyMutex);
         workReady = 1;
         pthread_cond_broadcast(&workReadyCond );
         pthread_mutex_unlock(&workReadyMutex);      

         // Wait for them to finish
         pthread_mutex_lock(&currentlyWorkingMutex);
         while (currentlyWorking != 0) {
             pthread_cond_wait(&currentlyWorkingCond, &currentlyWorkingMutex);
         }
         pthread_mutex_unlock(&currentlyWorkingMutex);

         // The threads are now waiting for permission to finish
         // Prevent them from starting again
         workReady = 0;
         currentlyIdle = 0;

         // Allow them to finish
         pthread_mutex_lock(&canFinishMutex);
         canFinish = 1;
         pthread_cond_broadcast(&canFinishCond);
         pthread_mutex_unlock(&canFinishMutex);
     }

 //   print(ints, numOfInts);

     if (validateResult(ints, numOfInts, expectedStartingValue)) {
         printf("Result correct.\n");
     }
     else {
         printf("Result invalid.\n");      
     }

     // clean up
     for (int i = 0; i < NUM_OF_THREADS; i++) {
         pthread_cancel(threads[i]);
     }
     free(ints);

     return 0;
 }

 void *processArray(void *args)
 {
     struct thread_data *data = (struct thread_data *)args;
     int *arr  = data->arr;
     int start = data->start;
     int end   = data->end;

     while (1) {

         // Set yourself as idle and signal to the main thread, when all threads are idle main will start
         pthread_mutex_lock(&currentlyIdleMutex);
         currentlyIdle++;
         pthread_cond_signal(&currentlyIdleCond);
         pthread_mutex_unlock(&currentlyIdleMutex);

         // wait for work from main
         pthread_mutex_lock(&workReadyMutex);
         while (!workReady) {
             pthread_cond_wait(&workReadyCond , &workReadyMutex);
         }
         pthread_mutex_unlock(&workReadyMutex);

         // Do the work
         for (int i = start; i <= end; i++) {
             arr[i] = arr[i] + 1;
         }

         // mark yourself as finished and signal to main
         pthread_mutex_lock(&currentlyWorkingMutex);
         currentlyWorking--;
         pthread_cond_signal(&currentlyWorkingCond);
         pthread_mutex_unlock(&currentlyWorkingMutex);

         // Wait for permission to finish
         pthread_mutex_lock(&canFinishMutex);
         while (!canFinish) {
             pthread_cond_wait(&canFinishCond , &canFinishMutex);
         }
         pthread_mutex_unlock(&canFinishMutex);
     }

     pthread_exit(NULL);
 }

 int validateResult(int *ints, int n, int start)
 {
     int tmp = start;
     for (int i = 0; i < n; i++, tmp++) {
         if (ints[i] != tmp) {
             return 0;
         }
     }
     return 1;
 }

 void print(int *ints, int n)
 {
     printf("[");
     for (int i = 0; i < n; i++) {
         printf("%d", ints[i]);
         if (i+1 != n)
             printf(", ");
     }
     printf("]\n");
 }

I'm not sure though if pthread_cancel is enough for clean up! As for the barrier, it would've been of a great help if it wasn't limited to some OSs as mentioned by @Jeremy.

Benchmarks:

I wanted to make sure that these many conditions aren't actually slowing down the algorithm, so I've setup this benchmark to compare the two solutions:

 #include <stdio.h>
 #include <stdlib.h>
 #include <pthread.h>
 #include <unistd.h>
 #include <sys/time.h>
 #include <sys/resource.h>

 #define NUM_OF_THREADS 5
 struct thread_data {
     int start;
     int end;
     int *arr;
 };
 pthread_mutex_t currentlyIdleMutex = PTHREAD_MUTEX_INITIALIZER;
 pthread_cond_t  currentlyIdleCond  = PTHREAD_COND_INITIALIZER;
 int currentlyIdle;
 pthread_mutex_t workReadyMutex = PTHREAD_MUTEX_INITIALIZER;
 pthread_cond_t  workReadyCond  = PTHREAD_COND_INITIALIZER;
 int workReady;
 pthread_cond_t  currentlyWorkingCond = PTHREAD_COND_INITIALIZER;
 pthread_mutex_t currentlyWorkingMutex= PTHREAD_MUTEX_INITIALIZER;
 int currentlyWorking;
 pthread_mutex_t canFinishMutex = PTHREAD_MUTEX_INITIALIZER;
 pthread_cond_t  canFinishCond  = PTHREAD_COND_INITIALIZER;
 int canFinish;

 void *processArrayMutex(void *args);
 void *processArrayJoin(void *args);
 double doItWithMutex(pthread_t *threads, struct thread_data *data, int loops);
 double doItWithJoin(pthread_t *threads, struct thread_data *data, int loops);

 int main(int argc, const char * argv[])
 {
     int numOfInts = 10;
     int *join_ints = malloc(numOfInts * sizeof(int));
     int *mutex_ints = malloc(numOfInts * sizeof(int));
     for (int i = 0; i < numOfInts; i++) {
         join_ints[i] = i;
         mutex_ints[i] = i;
     }

     pthread_t join_threads[NUM_OF_THREADS];
     pthread_t mutex_threads[NUM_OF_THREADS];
     struct thread_data join_thread_data[NUM_OF_THREADS];
     struct thread_data mutex_thread_data[NUM_OF_THREADS];
     workReady = 0;
     canFinish = 0;
     currentlyIdle = 0;
     currentlyWorking = 0;

     int remainingWork = numOfInts, amountOfWork;
     int startRange, endRange = -1;
     for (int i = 0; i < NUM_OF_THREADS; i++) {
         amountOfWork = remainingWork / (NUM_OF_THREADS - i);
         startRange = endRange + 1;
         endRange   = startRange + amountOfWork - 1;

         join_thread_data[i].arr   = join_ints;
         join_thread_data[i].start = startRange;
         join_thread_data[i].end   = endRange;
         mutex_thread_data[i].arr   = mutex_ints;
         mutex_thread_data[i].start = startRange;
         mutex_thread_data[i].end   = endRange;

         pthread_create(&mutex_threads[i], NULL, processArrayMutex, (void *)&mutex_thread_data[i]);
         remainingWork -= amountOfWork;
     }

     int numOfBenchmarkTests = 100;
     int numberOfLoopsPerTest= 1000;

     double join_sum = 0.0, mutex_sum = 0.0;
     for (int i = 0; i < numOfBenchmarkTests; i++)
     {
         double joinTime = doItWithJoin(join_threads, join_thread_data, numberOfLoopsPerTest);
         double mutexTime= doItWithMutex(mutex_threads, mutex_thread_data, numberOfLoopsPerTest);

         join_sum += joinTime;
         mutex_sum+= mutexTime;      
     }

     double join_avg = join_sum / numOfBenchmarkTests;
     double mutex_avg= mutex_sum / numOfBenchmarkTests;

     printf("Join average : %f\n", join_avg);
     printf("Mutex average: %f\n", mutex_avg);

     double diff = join_avg - mutex_avg;
     if (diff > 0.0)
         printf("Mutex is %.0f%% faster.\n", 100 * diff / join_avg);
     else if (diff < 0.0)
         printf("Join  is %.0f%% faster.\n", 100 * diff / mutex_avg);
     else
         printf("Both have the same performance.");

     free(join_ints);
     free(mutex_ints);

     return 0;
 }

 // From https://stackoverflow.com/a/2349941/408286
 double get_time()
 {
     struct timeval t;
     struct timezone tzp;
     gettimeofday(&t, &tzp);
     return t.tv_sec + t.tv_usec*1e-6;
 }

 double doItWithMutex(pthread_t *threads, struct thread_data *data, int num_loops)
 {
     double start = get_time();

     int loops = num_loops;
     while (loops-- != 0) {
         // Make sure all of them are ready
         pthread_mutex_lock(&currentlyIdleMutex);
         while (currentlyIdle != NUM_OF_THREADS) {
             pthread_cond_wait(&currentlyIdleCond, &currentlyIdleMutex);
         }
         pthread_mutex_unlock(&currentlyIdleMutex);

         // All threads are now blocked; it's safe to not lock the mutex.
         // Prevent them from finishing before authorized.
         canFinish = 0;
         // Reset the number of currentlyWorking threads
         currentlyWorking = NUM_OF_THREADS;

         // Signal to the threads to start
         pthread_mutex_lock(&workReadyMutex);
         workReady = 1;
         pthread_cond_broadcast(&workReadyCond );
         pthread_mutex_unlock(&workReadyMutex);

         // Wait for them to finish
         pthread_mutex_lock(&currentlyWorkingMutex);
         while (currentlyWorking != 0) {
             pthread_cond_wait(&currentlyWorkingCond, &currentlyWorkingMutex);
         }
         pthread_mutex_unlock(&currentlyWorkingMutex);

         // The threads are now waiting for permission to finish
         // Prevent them from starting again
         workReady = 0;
         currentlyIdle = 0;

         // Allow them to finish
         pthread_mutex_lock(&canFinishMutex);
         canFinish = 1;
         pthread_cond_broadcast(&canFinishCond);
         pthread_mutex_unlock(&canFinishMutex);
     }

     return get_time() - start;
 }

 double doItWithJoin(pthread_t *threads, struct thread_data *data, int num_loops)
 {
     double start = get_time();

     int loops = num_loops;
     while (loops-- != 0) {
         // create them
         for (int i = 0; i < NUM_OF_THREADS; i++) {
             pthread_create(&threads[i], NULL, processArrayJoin, (void *)&data[i]);
         }
         // wait
         for (int i = 0; i < NUM_OF_THREADS; i++) {
             pthread_join(threads[i], NULL);
         }
     }

     return get_time() - start;
 }

 void *processArrayMutex(void *args)
 {
     struct thread_data *data = (struct thread_data *)args;
     int *arr  = data->arr;
     int start = data->start;
     int end   = data->end;

     while (1) {

         // Set yourself as idle and signal to the main thread, when all threads are idle main will start
         pthread_mutex_lock(&currentlyIdleMutex);
         currentlyIdle++;
         pthread_cond_signal(&currentlyIdleCond);
         pthread_mutex_unlock(&currentlyIdleMutex);

         // wait for work from main
         pthread_mutex_lock(&workReadyMutex);
         while (!workReady) {
             pthread_cond_wait(&workReadyCond , &workReadyMutex);
         }
         pthread_mutex_unlock(&workReadyMutex);

         // Do the work
         for (int i = start; i <= end; i++) {
             arr[i] = arr[i] + 1;
         }

         // mark yourself as finished and signal to main
         pthread_mutex_lock(&currentlyWorkingMutex);
         currentlyWorking--;
         pthread_cond_signal(&currentlyWorkingCond);
         pthread_mutex_unlock(&currentlyWorkingMutex);

         // Wait for permission to finish
         pthread_mutex_lock(&canFinishMutex);
         while (!canFinish) {
             pthread_cond_wait(&canFinishCond , &canFinishMutex);
         }
         pthread_mutex_unlock(&canFinishMutex);
     }

     pthread_exit(NULL);
 }

 void *processArrayJoin(void *args)
 {
     struct thread_data *data = (struct thread_data *)args;
     int *arr  = data->arr;
     int start = data->start;
     int end   = data->end;

     // Do the work
     for (int i = start; i <= end; i++) {
         arr[i] = arr[i] + 1;
     }

     pthread_exit(NULL);
 }

And the output is:

Join average : 0.153074
Mutex average: 0.071588
Mutex is 53% faster.

Thank you again. I really appreciate your help!

like image 985
mota Avatar asked Sep 05 '12 13:09

mota


3 Answers

There are several synchronization mechanisms you can use (condition variables, for example). I think the simplest would be to use a pthread_barrier to synchronize the the start of the threads.

Assuming that you want all of the threads to 'sync up' on each loop iteration, you can just reuse the barrier. If you need something more flexible, a condition variable might be more appropriate.

When you decide it's time for the thread to wrap up (you haven't indicated how the threads will know to break out of the infinite loop - a simple shared variable might be used for that; the shared variable could be an atomic type or protected with a mutex), the main() thread should use pthread_join() to wait for all the threads to complete.

like image 142
Michael Burr Avatar answered Nov 19 '22 07:11

Michael Burr


You need to use a different synchronization technique than join, that's clear.

Unfortunately you have a lot of options. One is a "synchronization barrier", which basically is a thing where each thread that reaches it blocks until they've all reached it (you specify the number of threads in advance). Look at pthread_barrier.

Another is to use a condition-variable/mutex pair (pthread_cond_*). When each thread finishes it takes the mutex, increments a count, signals the condvar. The main thread waits on the condvar until the count reaches the value it expects. The code looks like this:

// thread has finished
mutex_lock
++global_count
// optional optimization: only execute the next line when global_count >= N
cond_signal
mutex_unlock

// main is waiting for N threads to finish
mutex_lock
while (global_count < N) {
    cond_wait
}
mutex_unlock

Another is to use a semaphore per thread -- when the thread finishes it posts its own semaphore, and the main thread waits on each semaphore in turn instead of joining each thread in turn.

You also need synchronization to re-start the threads for the next job -- this could be a second synchronization object of the same type as the first, with details changed for the fact that you have 1 poster and N waiters rather than the other way around. Or you could (with care) re-use the same object for both purposes.

If you've tried these things and your code didn't work, maybe ask a new specific question about the code you tried. All of them are adequate to the task.

like image 30
Steve Jessop Avatar answered Nov 19 '22 06:11

Steve Jessop


You are working at the wrong level of abstraction. This problem has been solved already. You are reimplementing a work queue + thread pool.

OpenMP seems like a good fit for your problem. It converts #pragma annotations into threaded code. I believe it would let you express what you're trying to do pretty directly.

Using libdispatch, what you're trying to do would be expressed as a dispatch_apply targeting a concurrent queue. This implicitly waits for all child tasks to complete. Under OS X, it's implemented using a non-portable pthread workqueue interface; under FreeBSD, I believe it manages a group of pthreads directly.

If it is portability concerns driving you to use raw pthreads, don't use pthread barriers. Barriers are an additional extension over and above basic POSIX threads. OS X for example does not support it. For more, see POSIX.

Blocking the main thread till all child threads have completed can be done using a count protected by a condition variable or, even more simply, using a pipe and a blocking read where the number of bytes to read matches the number of threads. Each thread writes one byte on work completion, then sleeps till it gets new work from the main thread. The main thread unblocks once each thread has written its "I'm done!" byte.

Passing work to the child threads can be done using a mutex protecting the work-descriptor and a condition to signal new work. You could use a single array of work descriptors that all threads draw from. On signal, each one tries to grab the mutex. On grabbing the mutex, it would dequeue some work, signal anew if the queue is nonempty, and then process its work, after which it would signal completion to the master thread.

You could reuse this "work queue" to unblock the main thread by enqueueing the results, with the main thread waiting till the result queue length matches the number of threads; the pipe approach is just using a blocking read to do this count for you.

like image 2
Jeremy W. Sherman Avatar answered Nov 19 '22 07:11

Jeremy W. Sherman