Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Two types of message in a message queue

I was writing a program that starts two processes.

The first process, the "client" sends two type of messages.

The first type increases a shared resource (int). The second type sets the resource to 0.

After 10 messages, the client has to send a message with a special type that forces the threads listening on the two queues to stop. So the client sends two messages (one for each queue) with a special value in the type field in order to terminate the threads.

The second process is the "server".

The server has three threads:

the first one is listening on the "increase" queue. It has to handle the increase request until the termination message. So i wrote:

do{
msgrcv(id_i,&msg,dimensione,INCREMENTA,0);
 pthread_mutex_lock(&mutex);
 printf("THREAD 1: Il contatore vale:%d\n",*contatore);
 incremento = msg.contenuto;
 printf("THREAD 1: Incremento di : %d\n",incremento);
  *contatore+=incremento;
 printf("THREAD 1: Il contatore vale:%d\n",*contatore);
 pthread_mutex_unlock(&mutex);
 msgrcv(id_i,&msg,dimensione,TERMINA,IPC_NOWAIT); //IPC_NOWAIT or the thread will
 freeze after the first message
}
while(msg.tipo!=TERMINA);

The second one has to handle the "set to 0" requests until the termination message.

do{msgrcv(id_a,&msg,dimensione,AZZERA,0);
pthread_mutex_lock(&mutex);
printf("THREAD 2: IL CONTATORE VALE:%d\n",*contatore);
*contatore=0;
printf("Thread 2: Contatore azzerato. Ora vale : %d\n",*contatore);
pthread_mutex_unlock(&mutex);
 msgrcv(id_a,&msg,dimensione,TERMINA,IPC_NOWAIT);//IPC_NOWAIT or the thread will
 freeze after the first message
 }
while(msg.tipo!=TERMINA);

The third thread increases the value of the resource using the mutex to enter in mutual exclusion.

The problem is that thread1 and thread2 of the server process doesn't terminate where they should. In fact, they stuck on the first msgrcv() after all the increase/set0 messages. So the problem is that the two threads can't manage to listen the termination message.

I tried to set IPC_NOWAIT also for the first msgrcv but didn't work

like image 299
EagleOne Avatar asked Jan 31 '16 16:01

EagleOne


1 Answers

You're relying on a race condition, one that you will almost never win.

Let's take a look at the first block:

do {
    // Note the msg type:           vvvvvvvvvv
    msgrcv( id_i, &msg, dimensione, INCREMENTA, 0 );

    // ...

    // Note the msg type:           vvvvvvv
    msgrcv( id_i, &msg, dimensione, TERMINA, IPC_NOWAIT );
}
while( msg.tipo != TERMINA );

That second 'msgrcv' call in the loop is your attempt to look for a terminator message type, before you loop back to the top and block, waiting for another INCREMENTA message.

Consider the following chain of events:

   Sender              Receiver
   ---------------     -----------------
 1                      Call msgrcv with INCREMENTA. Block indefinitely
 2  Send 'INCREMENTA'
 3                      msgrcv returns. Begin processing increment msg.
 4                      Processing finshed.
 5                      Call msgrcv with TERMINA.
 6                      No TERMINA message found (queue empty), returns immediately.
 7                      Go to top of loop.
 8                      Call msgrcv with INCREMENTA. Block indefinitely
 9  Send 'TERMINA'      
10                      Nothing happens because we're waiting for 'INCREMENTA'.

You can't try to query the message queue in this pattern. If events 8 and 9 had been reversed, your logic would have happened to work - but that's a race condition, and one that you're likely to lose frequently.

Instead, why not use msgrcv to receive any type of message, and then after reading the message from the queue, figure out what type of message you got and handle it from there. If you pass 0 for the 'msgtyp' parameter to msgrcv, it'll give you all messages and then you can handle it how you please.

  while(true) {

      // Get any msg type:           vv
      msgrcv( id_i, &msg, dimensione, 0, 0 );

      if ( msg.tipo == TERMINA ) {
          break;
      }
      else {
           // ...
      }
  }
like image 126
antiduh Avatar answered Oct 09 '22 15:10

antiduh