Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pthread synchronized blocking queue

I'm looking for a recommended implementation of a thread-safe blocking queue (multi producer/consumer) in C using pthread synchronization semantics.

like image 691
Mike Avatar asked Jan 02 '11 10:01

Mike


2 Answers

Try APR queues. It's used by the apache web server and pretty well tested.

http://apr.apache.org/docs/apr-util/1.3/apr__queue_8h.html

like image 25
Missaka Wijekoon Avatar answered Nov 14 '22 03:11

Missaka Wijekoon


Here's one I use

threadqueue.h

#ifndef _THREADQUEUE_H_
#define _THREADQUEUE_H_ 1

#include <pthread.h>

#ifdef __cplusplus
extern "C" {
#endif
/**
 * @defgroup ThreadQueue ThreadQueue
 *
 * Little API for waitable queues, typically used for passing messages
 * between threads.
 *
 */

/**
 * @mainpage
   */

/**
 * A thread message.
 *
 * @ingroup ThreadQueue
 *
 * This is used for passing to #thread_queue_get for retreive messages.
 * the date is stored in the data member, the message type in the  #msgtype.
 *
 * Typical:
 * @code
 * struct threadmsg;
 * struct myfoo *foo;
 * while(1)
 *      ret = thread_queue_get(&queue,NULL,&message);
 *      ..
 *      foo = msg.data;
 *      switch(msg.msgtype){
 *              ...
 *      }
 * }
 * @endcode
 *
 */
struct threadmsg{
        /**
         * Holds the data.
         */
        void *data;
        /**
         * Holds the messagetype
         */
        long msgtype;
        /**
        * Holds the current queue lenght. Might not be meaningful if there's several readers
        */
        long qlength;

};


/**
 * A TthreadQueue
 *
 * @ingroup ThreadQueue
 *
 * You should threat this struct as opaque, never ever set/get any
 * of the variables. You have been warned.
 */
struct threadqueue {
/**
 * Length of the queue, never set this, never read this.
 * Use #threadqueue_length to read it.
 */
        long length;
/**
 * Mutex for the queue, never touch.
 */
        pthread_mutex_t mutex;
/**
 * Condition variable for the queue, never touch.
 */
        pthread_cond_t cond;
/**
 * Internal pointers for the queue, never touch.
 */
        struct msglist *first,*last;
/**
 * Internal cache of msglists
 */
    struct msglist *msgpool;
/**
 * No. of elements in the msgpool
 */
    long msgpool_length;
};

/**
 * Initializes a queue.
 *
 * @ingroup ThreadQueue
 *
 * thread_queue_init initializes a new threadqueue. A new queue must always
 * be initialized before it is used.
 *
 * @param queue Pointer to the queue that should be initialized
 * @return 0 on success see pthread_mutex_init
 */
int thread_queue_init(struct threadqueue *queue);

/**
 * Adds a message to a queue
 *
 * @ingroup ThreadQueue
 *
 * thread_queue_add adds a "message" to the specified queue, a message
 * is just a pointer to a anything of the users choice. Nothing is copied
 * so the user must keep track on (de)allocation of the data.
 * A message type is also specified, it is not used for anything else than
 * given back when a message is retreived from the queue.
 *
 * @param queue Pointer to the queue on where the message should be added.
 * @param data the "message".
 * @param msgtype a long specifying the message type, choice of the user.
 * @return 0 on succes ENOMEM if out of memory EINVAL if queue is NULL
 */
int thread_queue_add(struct threadqueue *queue, void *data, long msgtype);

/**
 * Gets a message from a queue
 *
 * @ingroup ThreadQueue
 *
 * thread_queue_get gets a message from the specified queue, it will block
 * the caling thread untill a message arrives, or the (optional) timeout occurs.
 * If timeout is NULL, there will be no timeout, and thread_queue_get will wait
 * untill a message arrives.
 *
 * struct timespec is defined as:
 * @code
 *      struct timespec {
 *                 long    tv_sec;         // seconds
 *                 long    tv_nsec;        // nanoseconds
 *             };
 * @endcode
 *
 * @param queue Pointer to the queue to wait on for a message.
 * @param timeout timeout on how long to wait on a message
 * @param msg pointer that is filled in with mesagetype and data
 *
 * @return 0 on success EINVAL if queue is NULL ETIMEDOUT if timeout occurs
 */
int thread_queue_get(struct threadqueue *queue, const struct timespec *timeout, struct threadmsg *msg);


/**
 * Gets the length of a queue
 *
 * @ingroup ThreadQueue
 *
 * threadqueue_length returns the number of messages waiting in the queue
 *
 * @param queue Pointer to the queue for which to get the length
 * @return the length(number of pending messages) in the queue
 */
long thread_queue_length( struct threadqueue *queue );

/**
 * @ingroup ThreadQueue
 * Cleans up the queue.
 *
 * threadqueue_cleanup cleans up and destroys the queue.
 * This will remove all messages from a queue, and reset it. If
 * freedata is != 0 free(3) will be called on all pending messages in the queue
 * You cannot call this if there are someone currently adding or getting messages
 * from the queue.
 * After a queue have been cleaned, it cannot be used again untill #thread_queue_init
 * has been called on the queue.
 *
 * @param queue Pointer to the queue that should be cleaned
 * @param freedata set to nonzero if free(3) should be called on remaining
 * messages
 * @return 0 on success EINVAL if queue is NULL EBUSY if someone is holding any locks on the queue
 */
int thread_queue_cleanup(struct threadqueue *queue, int freedata);

#ifdef __cplusplus
}
#endif

#endif

threadqueue.c

#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <sys/time.h>
#include "../h/threadqueue.h"


#define MSGPOOL_SIZE 256

struct msglist {
    struct threadmsg msg;
    struct msglist *next;
};

static inline struct msglist *get_msglist(struct threadqueue *queue)
{
    struct msglist *tmp;

    if(queue->msgpool != NULL) {
        tmp = queue->msgpool;
        queue->msgpool = tmp->next;
        queue->msgpool_length--;
    } else {
        tmp = malloc(sizeof *tmp);
    }

    return tmp;
}

static inline void release_msglist(struct threadqueue *queue,struct msglist *node)
{

    if(queue->msgpool_length > ( queue->length/8 + MSGPOOL_SIZE)) {
        free(node);
    } else {
        node->msg.data = NULL;
        node->msg.msgtype = 0;
        node->next = queue->msgpool;
        queue->msgpool = node;
        queue->msgpool_length++;
    }
    if(queue->msgpool_length > (queue->length/4 + MSGPOOL_SIZE*10)) {
        struct msglist *tmp = queue->msgpool;
        queue->msgpool = tmp->next;
        free(tmp);
        queue->msgpool_length--;
    }
}

int thread_queue_init(struct threadqueue *queue)
{
    int ret = 0;
    if (queue == NULL) {
        return EINVAL;
    }
    memset(queue, 0, sizeof(struct threadqueue));
    ret = pthread_cond_init(&queue->cond, NULL);
    if (ret != 0) {
        return ret;
    }

    ret = pthread_mutex_init(&queue->mutex, NULL);
    if (ret != 0) {
        pthread_cond_destroy(&queue->cond);
        return ret;
    }

    return 0;

}

int thread_queue_add(struct threadqueue *queue, void *data, long msgtype)
{
    struct msglist *newmsg;
    pthread_mutex_lock(&queue->mutex);
    newmsg = get_msglist(queue);
    if (newmsg == NULL) {
        pthread_mutex_unlock(&queue->mutex);
        return ENOMEM;
    }
    newmsg->msg.data = data;
    newmsg->msg.msgtype = msgtype;

    newmsg->next = NULL;
    if (queue->last == NULL) {
        queue->last = newmsg;
        queue->first = newmsg;
    } else {
        queue->last->next = newmsg;
        queue->last = newmsg;
    }

        if(queue->length == 0)
                pthread_cond_broadcast(&queue->cond);
    queue->length++;
    pthread_mutex_unlock(&queue->mutex);

    return 0;

}

int thread_queue_get(struct threadqueue *queue, const struct timespec *timeout, struct threadmsg *msg)
{
    struct msglist *firstrec;
    int ret = 0;
    struct timespec abstimeout;

    if (queue == NULL || msg == NULL) {
        return EINVAL;
    }
    if (timeout) {
        struct timeval now;

        gettimeofday(&now, NULL);
        abstimeout.tv_sec = now.tv_sec + timeout->tv_sec;
        abstimeout.tv_nsec = (now.tv_usec * 1000) + timeout->tv_nsec;
        if (abstimeout.tv_nsec >= 1000000000) {
            abstimeout.tv_sec++;
            abstimeout.tv_nsec -= 1000000000;
        }
    }

    pthread_mutex_lock(&queue->mutex);

    /* Will wait until awakened by a signal or broadcast */
    while (queue->first == NULL && ret != ETIMEDOUT) {  //Need to loop to handle spurious wakeups
        if (timeout) {
            ret = pthread_cond_timedwait(&queue->cond, &queue->mutex, &abstimeout);
        } else {
            pthread_cond_wait(&queue->cond, &queue->mutex);

        }
    }
    if (ret == ETIMEDOUT) {
        pthread_mutex_unlock(&queue->mutex);
        return ret;
    }

    firstrec = queue->first;
    queue->first = queue->first->next;
    queue->length--;

    if (queue->first == NULL) {
        queue->last = NULL;     // we know this since we hold the lock
        queue->length = 0;
    }


    msg->data = firstrec->msg.data;
    msg->msgtype = firstrec->msg.msgtype;
        msg->qlength = queue->length;

    release_msglist(queue,firstrec);
    pthread_mutex_unlock(&queue->mutex);

    return 0;
}

//maybe caller should supply a callback for cleaning the elements ?
int thread_queue_cleanup(struct threadqueue *queue, int freedata)
{
    struct msglist *rec;
    struct msglist *next;
    struct msglist *recs[2];
    int ret,i;
    if (queue == NULL) {
        return EINVAL;
    }

    pthread_mutex_lock(&queue->mutex);
    recs[0] = queue->first;
    recs[1] = queue->msgpool;
    for(i = 0; i < 2 ; i++) {
        rec = recs[i];
        while (rec) {
            next = rec->next;
            if (freedata) {
                free(rec->msg.data);
            }
            free(rec);
            rec = next;
        }
    }

    pthread_mutex_unlock(&queue->mutex);
    ret = pthread_mutex_destroy(&queue->mutex);
    pthread_cond_destroy(&queue->cond);

    return ret;

}

long thread_queue_length(struct threadqueue *queue)
{
    long counter;
    // get the length properly
    pthread_mutex_lock(&queue->mutex);
    counter = queue->length;
    pthread_mutex_unlock(&queue->mutex);
    return counter;

}
like image 174
nos Avatar answered Nov 14 '22 04:11

nos