Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the meaning of $channel->wait() in RabbitMQ

Tags:

I am quite new in RabbitMQ. I'm working with php-amqplib library with codeigniter, and still wondering about some knowledge which I am lacking.

  • Why $channel->wait() is used?
  • Why it is always reside inside an endless while loop?
  • How to/Can I bypass the Infinite while loop.

Like In a situation when one user of my project wants to broadcast new campaign to 100k leads, the second user gets effected if he has some 100 mails to be sent, The second has to wait for 100k mails to get delivered first then the last user gets his turn.

I need a solution for Concurrent Consumers, who works smoothly without affecting the other

Here is my code snippet:

public function campaign2(){
        $this->load->library('mylibrary');
        for( $i=1;$i<=5;$i++ ) {
            $url = "http://localhost/myproject/rabbit/waiting";
            $param = array('index' => $i);
            $this->waiting($i);
        }          
}

public function waiting($i)
    {
        ini_set('memory_limit','400M');
        ini_set('max_execution_time', 0);
        ini_set('display_errors', 1);

        ${'conn_'.$i} = connectRabbit();
        ${'channel_'.$i} = ${'conn_'.$i}->channel();
        ${'channel_'.$i}->exchange_declare('ha-local-campaign-'.$i.'-exchange', 'fanout', false, true, false);
        $q    = populateQueueName('campaign-'.$i);
        ${'channel_'.$i}->queue_declare($q, false, true, false, false); 
        ${'channel_'.$i}->queue_bind($q, 'ha-local-campaign-'.$i.'-exchange', 'priority.'.$i);
        $consumer_tag = 'campaign_consumer' ;
        function process_message($msg) {
            echo 'Mail Sent';
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        }
        function shutdown($channel, $conn){
            echo '['.date('H:i:s').'] Campaign consumer - Shutdown!!'; 
        }

        ${'channel_'.$i}->basic_consume($q, $consumer_tag, false, false, true, false,'process_message');
        while(1) {
           ${'channel_'.$i}->wait();
        }
        register_shutdown_function('shutdown', ${'channel_'.$i}, ${'conn_'.$i});  
    }

If anyone kindly guide me through the process I will be grateful.

like image 798
NeonHead Avatar asked Jan 20 '17 12:01

NeonHead


People also ask

What is channel in RabbitMQ?

A connection is a TCP connection between your application and the RabbitMQ broker. A channel is a virtual connection inside a connection. In other words, a channel multiplexes a TCP connection. Typically, each process only creates one TCP connection, and uses multiple channels in that connection for different threads.

How do you cancel a channel on RabbitMQ?

A connection can be closed via the RabbitMQ Management Interface. Enter the connection tab and press on the connection. Go to the bottom of the page and press Close this connection, followed by pressing Force Close.

What is delivery tag in RabbitMQ?

When a consumer (subscription) is registered, messages will be delivered (pushed) by RabbitMQ using the basic. deliver method. The method carries a delivery tag, which uniquely identifies the delivery on a channel. Delivery tags are therefore scoped per channel.

How does RabbitMQ queue work?

RabbitMQ is a widely used open-source message broker that helps in scaling the application by deploying a message queuing mechanism in between the two applications. It offers temporary storage for data preventing data loss. RabbitMQ Queue takes messages from the publisher and sends them to the consumer.


1 Answers

When you call $channel->wait() you are:

  • Inspecting the channel's queues to see if there are pending messages.

  • For each message you are going to call the registered callback for the corresponding channel's callback.

From the "hello world example", step by step::

// First, you define `$callback` as a function receiving
// one parameter (the _message_).
$callback = function($msg) {
  echo " [x] Received ", $msg->body, "\n";
};

// Then, you assign `$callback` the the "hello" queue.
$channel->basic_consume('hello', '', false, true, false, false, $callback);

// Finally: While I have any callbacks defined for the channel, 
while(count($channel->callbacks)) {
    // inspect the queue and call the corresponding callbacks
    //passing the message as a parameter
    $channel->wait();
}
// This is an infinite loop: if there are any callbacks,
// it'll run forever unless you interrupt script's execution.

Have your second user send use a different queue. You can have as many queues as you want.

like image 195
yivi Avatar answered Sep 25 '22 11:09

yivi