I have a program in which I am trying to implement a multiple-producer, multiple-consumer setting. I have code which seems to work well when I have one consumer and multiple producers, but introducing multiple consumer threads seems to raise a few odd issues.
Here's what I have right now:
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#define MAX 10
typedef struct travs {
int id;
int numBags;
int arrTime;
struct travs *next;
} travs;
travs *queue;
//travs *servicing[MAX];
int produced; // The total # of produced in the queue
pthread_mutex_t queue_lock;
//pthread_mutex_t staff_lock;
pthread_cond_t ct, cs;
int CheckIn(){
sleep(1);
if(produced != 0) return 1;
else return 0;
}
void *producerThread(void *args){
travs *traveler = (travs *)args;
// Acquire the mutex
pthread_mutex_lock(&queue_lock);
produced++;
// pthread_cond_signal(&cs);
pthread_cond_wait(&ct, &queue_lock);
printf("Producer %d is now checked in at time %d.\n", queue->id, (1+queue- >arrTime));
queue = queue->next;
pthread_mutex_unlock(&queue_lock);
return;
}
int Producer(int id, int numBags, int arrTime){
int ret;
pthread_t ttid;
travs *traveler = malloc(sizeof(travs));
traveler->id = id;
traveler->numBags = numBags;
traveler->arrTime = arrTime;
sleep(arrTime);
pthread_mutex_lock(&queue_lock);
if(queue != NULL) {
travs *check_in = malloc(sizeof(travs));
check_in = queue;
while(check_in->next != NULL){
check_in = check_in->next;
}
check_in->next = traveler;
}
else { queue = traveler; }
pthread_mutex_unlock(&queue_lock);
// Create a new traveler thread
ret = pthread_create(&ttid, NULL, producerThread, (void *)traveler);
// Check if thread creation was successful
if(ret == 0) {
printf("Producer %d has entered the check-in line at time %d; s/he is at position %d and has %d bags.\n", id, arrTime, produced, numBags);
pthread_cond_signal(&cs);
return 0;
}
else return -1;
}
void *consumerThread(void *arg){
int i = 0; // travelers serviced
char *name = (char *)arg;
while(1) { // run iteratively
// If 20 producers have been served, the consumer's work is done.
if(i == 20) {
printf("Consumer %s's service has completed!\n", name);
pthread_exit(NULL);
}
// Sleep for 10s if 5 travelers have been checked in
if (((i+1) % 5) == 0) {
// Wake up sleeping travelers
printf("Consumer %s is taking a break.\n", name);
sleep(2);
printf("Consumer %s's break is over.\n", name);
}
if(CheckIn()) {
pthread_mutex_lock(&queue_lock);
int j = 1;
pthread_cond_wait(&cs, &queue_lock);
printf("Producer %d presents ticket to consumer %s.\n", queue->id, name);
printf("Consumer %s gives boarding pass to producer %d.\n", name, queue->id);
while(j <= queue->numBags){
printf("Consumer %s checks in bag %d for producer %d; baggage tag is _X_.\n", name, j, queue->id);
j++;
}
// Signal producer being serviced that their check in is complete.
i++;
pthread_mutex_unlock(&queue_lock);
produced--;
pthread_cond_signal(&ct);
}
sleep(3);
}
}
int Consumer(char *Name) {
sleep(5);
int ret;
pthread_t stid;
// Create a staff thread
ret = pthread_create(&stid, NULL, consumerThread, (void *)Name);
// Acquire the lock
if(ret == 0) {
printf("Producer %s's service has begun!\n", Name);
return 0;
}
else return -1;
}
int main() {
int ret = 0;
char *staff_name = malloc(sizeof(char));
int staff_check = 0;
int trav_check = 0;
int id;
int bagnum;
int travtime;
FILE *consumer_fp;
FILE *producer_fp;
queue = malloc(sizeof(travs));
queue = NULL;
/*while(ret < 10){
servicing[ret] = malloc(sizeof(travs));
servicing[ret] = NULL;
}*/
// Initilize mutexes
pthread_mutex_init(&queue_lock, NULL);
//pthread_mutex_init(&staff_lock, NULL);
// Initialize condition variables
pthread_cond_init(&ct, NULL);
pthread_cond_init(&cs, NULL);
// Open the file so we can start reading from it
consumer_fp = fopen("staff.txt", "r");
producer_fp = fopen("travelers.txt", "r");
staff_check = fscanf(consumer_fp, "%s", staff_name);
trav_check = fscanf(producer_fp, "%d %d %d", &id, &bagnum, &travtime);
while(1){
K:
while(staff_check == 1) {
Consumer(staff_name);
staff_check = fscanf(consumer_fp, "%s", staff_name);
goto L;
}
L:
while(trav_check == 3) {
Producer(id, bagnum, travtime);
trav_check = fscanf(producer_fp, "%d %d %d", &id, &bagnum, &travtime);
goto K;
}
pthread_exit(NULL);
}
}
In this setting, every producer thread lives for only a short time before returning, and does no real computation itself besides adding a new element to the global queue and few aptly timed output lines.
However, when I introduce multiple producers, only the last producer thread does anything.
From what I can surmise, I need the following:
i) Separate Queues for Producers waiting to be checked-in and producers which are currently being checked-in (commented out as travs *servicing[MAX] above)
ii) A separate mutex for consumers.
However, I'm not sure how to implement this. This is the idea I had in mind:
CheckIn() a producer thread and copy *queue to *servicing[i] (in consumer thread).
Set queue = queue->next (in producer thread).
But, how can I guarantee that when I copy *queue over that it won't have advanced a step already? Can I signal a waiting thread with a lock different from the one a thread currently holds? And, more importantly, how would I have different consumer threads process different traveler threads?
Any aid would be greatly appreciated!
Use one queue.
Write two functions, one to add an existing item to the queue and one to remove an item from the queue. Do not using any locking in these functions. Test them in a single threaded application.
Then write two wrappers to such two add- and remove-functions. Those wrappers should take an addtionally mutex as argument. Lock this mutex in the wrappers before calling the add- or remove-function and unlock the mutex afterwards.
Write the producer thread function creating a new item and calling the add-item-wrapper. Write the consumer thread function calling the remove-item-wrapper and destorying the removed item.
Set up the main()
function declaring and initialising the mutex, then go for creating various instances of producers and consumers, using pthread_create()
. Pass the mutex as argument to the thread functions.
As I mentioned, this is a memory leak:
travs *traveler = malloc(sizeof(travs));
traveler = (travs *)args;
I'm not going to go into detail about "what's so bad about memory leaks?". If you want that answer, ask Google that question. You probably meant: travs *traveler = args;
.
if(queue != NULL) {
travs *check_in = malloc(sizeof(travs));
check_in = queue;
while(check_in->next != NULL){
check_in = check_in->next;
}
check_in->next = traveler;
}
else { queue = traveler; }
Memory leak put aside, why is queue mutex guarded previously in other functions, while there is no mutex guarding at all in this code? It seems like you've missed the point of mutexes. Your code races, here.
Perhaps pthread_rwlock_t
s would be more suitable for this kind of code.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With