Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

The version of pthread_join() that does not block main(): POSIX

I am trying to write a code that does not block main() when pthread_join() is called: i.e. basically trying to implement my previous question mentioned below:

https://stackoverflow.com/questions/24509500/pthread-join-and-main-blocking-multithreading

And the corresponding explanation at:

pthreads - Join on group of threads, wait for one to exit

As per suggested answer:

You'd need to create your own version of it - e.g. an array of flags (one flag per thread) protected by a mutex and a condition variable; where just before "pthread_exit()" each thread acquires the mutex, sets its flag, then does "pthread_cond_signal()". The main thread waits for the signal, then checks the array of flags to determine which thread/s to join (there may be more than one thread to join by then).

I have tried as below:

My status array which keeps a track of which threads have finished:

typedef struct {
    int Finish_Status[THREAD_NUM];
    int signalled;

    pthread_mutex_t mutex;
    pthread_cond_t FINISHED;
    }THREAD_FINISH_STATE;

The thread routine, it sets the corresponding array element when the thread finishes and also signals the condition variable:

void* THREAD_ROUTINE(void* arg)
{
    THREAD_ARGUMENT* temp=(THREAD_ARGUMENT*) arg;
    printf("Thread created with id %d\n",temp->id);
    waitFor(5);
    pthread_mutex_lock(&(ThreadFinishStatus.mutex));
    ThreadFinishStatus.Finish_Status[temp->id]=TRUE;
    ThreadFinishStatus.signalled=TRUE;
    if(ThreadFinishStatus.signalled==TRUE)
    {
      pthread_cond_signal(&(ThreadFinishStatus.FINISHED));
      printf("Signal that thread %d finished\n",temp->id);
     }
    pthread_mutex_unlock(&(ThreadFinishStatus.mutex));

    pthread_exit((void*)(temp->id));
    }

I am not able to write the corresponding parts pthread_join() and pthread_cond_wait() functions. There are a few things which I am not able to implement.

1) How to write corresponding part pthread_cond_wait() in my main()?

2) I am trying to write it as:

   pthread_mutex_lock(&(ThreadFinishStatus.mutex));
    while((ThreadFinishStatus.signalled != TRUE){
     pthread_cond_wait(&(ThreadFinishStatus.FINISHED), &(ThreadFinishStatus.mutex));
     printf("Main Thread signalled\n");
     ThreadFinishStatus.signalled==FALSE; //Reset signalled
     //check which thread to join
    }
    pthread_mutex_unlock(&(ThreadFinishStatus.mutex)); 

But it does not enter the while loop.

3) How to use pthread_join() so that I can get the return value stored in my arg[i].returnStatus i.e. where to put below statement in my main:

`pthread_join(T[i],&(arg[i].returnStatus));`

COMPLETE CODE

#include <stdio.h>
#include <pthread.h>
#include <time.h>

#define THREAD_NUM 5
#define FALSE 0
#define TRUE 1


void waitFor (unsigned int secs) {
    time_t retTime;
    retTime = time(0) + secs;     // Get finishing time.
    while (time(0) < retTime);    // Loop until it arrives.
}

typedef struct {
    int Finish_Status[THREAD_NUM];
    int signalled;

    pthread_mutex_t mutex;
    pthread_cond_t FINISHED;
    }THREAD_FINISH_STATE;

typedef struct {
    int id;
    void* returnStatus;
    }THREAD_ARGUMENT;

THREAD_FINISH_STATE ThreadFinishStatus;

void initializeState(THREAD_FINISH_STATE* state)
{
 int i=0;
 state->signalled=FALSE;
 for(i=0;i<THREAD_NUM;i++)
 {
     state->Finish_Status[i]=FALSE;
     }
 pthread_mutex_init(&(state->mutex),NULL);
 pthread_cond_init(&(state->FINISHED),NULL);
    }

void destroyState(THREAD_FINISH_STATE* state)
{
 int i=0;
 for(i=0;i<THREAD_NUM;i++)
 {
     state->Finish_Status[i]=FALSE;
     }
 pthread_mutex_destroy(&(state->mutex));
 pthread_cond_destroy(&(state->FINISHED));
    }


void* THREAD_ROUTINE(void* arg)
{
    THREAD_ARGUMENT* temp=(THREAD_ARGUMENT*) arg;
    printf("Thread created with id %d\n",temp->id);
    waitFor(5);
    pthread_mutex_lock(&(ThreadFinishStatus.mutex));
    ThreadFinishStatus.Finish_Status[temp->id]=TRUE;
    ThreadFinishStatus.signalled=TRUE;
    if(ThreadFinishStatus.signalled==TRUE)
    {
      pthread_cond_signal(&(ThreadFinishStatus.FINISHED));
      printf("Signal that thread %d finished\n",temp->id);
     }
    pthread_mutex_unlock(&(ThreadFinishStatus.mutex));

    pthread_exit((void*)(temp->id));
    }

int main()
{
    THREAD_ARGUMENT arg[THREAD_NUM];
    pthread_t T[THREAD_NUM];
    int i=0;
    initializeState(&ThreadFinishStatus);

    for(i=0;i<THREAD_NUM;i++)
    {
        arg[i].id=i;
        }

    for(i=0;i<THREAD_NUM;i++)
        {
            pthread_create(&T[i],NULL,THREAD_ROUTINE,(void*)&arg[i]);
        }

    /*
     Join only if signal received
    */

    pthread_mutex_lock(&(ThreadFinishStatus.mutex));
    //Wait
    while((ThreadFinishStatus.signalled != TRUE){
     pthread_cond_wait(&(ThreadFinishStatus.FINISHED), &(ThreadFinishStatus.mutex));
     printf("Main Thread signalled\n");
     ThreadFinishStatus.signalled==FALSE; //Reset signalled
     //check which thread to join
    }
    pthread_mutex_unlock(&(ThreadFinishStatus.mutex));

    destroyState(&ThreadFinishStatus);
    return 0;
    }
like image 374
Gaurav K Avatar asked Dec 12 '25 10:12

Gaurav K


1 Answers

Here is an example of a program that uses a counting semaphore to watch as threads finish, find out which thread it was, and review some result data from that thread. This program is efficient with locks - waiters are not spuriously woken up (notice how the threads only post to the semaphore after they've released the mutex protecting shared state).

This design allows the main program to process the result from some thread's computation immediately after the thread completes, and does not require the main wait for all threads to complete. This would be especially helpful if the running time of each thread varied by a significant amount.

Most importantly, this program does not deadlock nor race.

#include <pthread.h>
#include <semaphore.h>
#include <stdlib.h>
#include <stdio.h>
#include <queue>

void* ThreadEntry(void* args );

typedef struct {
    int threadId;
    pthread_t thread;
    int threadResult;
} ThreadState;

sem_t           completionSema;

pthread_mutex_t resultMutex;

std::queue<int> threadCompletions;

ThreadState* threadInfos;

int main() {

    int numThreads = 10;
    int* threadResults;
    void* threadResult;
    int doneThreadId;


    sem_init( &completionSema, 0, 0 );

    pthread_mutex_init( &resultMutex, 0 );

    threadInfos = new ThreadState[numThreads];

    for ( int i = 0; i < numThreads; i++ ) {

        threadInfos[i].threadId = i;
        pthread_create( &threadInfos[i].thread, NULL, &ThreadEntry, &threadInfos[i].threadId  );
    }

    for ( int i = 0; i < numThreads; i++ ) {
        // Wait for any one thread to complete; ie, wait for someone
        // to queue to the threadCompletions queue.
        sem_wait( &completionSema );


        // Find out what was queued; queue is accessed from multiple threads,
        // so protect with a vanilla mutex.
        pthread_mutex_lock(&resultMutex);
        doneThreadId = threadCompletions.front();
        threadCompletions.pop();
        pthread_mutex_unlock(&resultMutex);

        // Announce which thread ID we saw finish
        printf(
            "Main saw TID %d finish\n\tThe thread's result was %d\n",
            doneThreadId,
            threadInfos[doneThreadId].threadResult
        );

        // pthread_join to clean up the thread.
        pthread_join( threadInfos[doneThreadId].thread, &threadResult );
    }

    delete threadInfos;

    pthread_mutex_destroy( &resultMutex );
    sem_destroy( &completionSema );

}

void* ThreadEntry(void* args ) {
    int threadId = *((int*)args);

    printf("hello from thread %d\n", threadId );

    // This can safely be accessed since each thread has its own space
    // and array derefs are thread safe.
    threadInfos[threadId].threadResult = rand() % 1000;


    pthread_mutex_lock( &resultMutex );
    threadCompletions.push( threadId );
    pthread_mutex_unlock( &resultMutex );

    sem_post( &completionSema );

    return 0;
}
like image 120
antiduh Avatar answered Dec 14 '25 07:12

antiduh