Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rabbitmq - php amqp broken broken pipe error

Tags:

php

rabbitmq

amqp

I am processing a huge xml document (which contains around a million entries) and subsequently importing a formatted version to the db using rabbitmq. Each time after publishing around 200,000 entries I receive a broken pipe error , and rabbitmq is unable to recover from it.

Notice Error: fwrite(): send of 2651 bytes failed with errno=11 Resource temporarily unavailable in [/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc, line 439]

Notice Error: fwrite(): send of 33 bytes failed with errno=104 Connection reset by peer in [/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc, line 439]

Notice Error: fwrite(): send of 19 bytes failed with errno=32 Broken pipe in [/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc, line 439]

This subsequently causes a node down error and the process needs to be manually killed to recover from it.

These are my class methods:-

public function publishMessage($message) {
    if (!isset($this->conn)) {
        $this->_createNewConnectionAndChannel();
    }
    try {
        $this->ch->basic_publish(
            new AMQPMessage($message, array('content_type' => 'text/plain')), 
            $this->defaults['exchange']['name'], 
            $this->defaults['binding']['routing_key']
        );
    } catch (Exception $e) {
        echo "Caught exception : " . $e->getMessage();
        echo "Creating new connection.";
        $this->_createNewConnectionAndChannel();
        $this->publishMessage($message); // try again
    }
}

protected function _createNewConnectionAndChannel() {
    if (isset($this->conn)) {
        $this->conn->close();
    }

    if(isset($this->ch)) {
        $this->ch->close();
    }

    $this->conn = new AMQPConnection(
        $this->defaults['connection']['host'], 
        $this->defaults['connection']['port'], 
        $this->defaults['connection']['user'], 
        $this->defaults['connection']['pass']
    );
    $this->ch = $this->conn->channel();
    $this->ch->access_request($this->defaults['channel']['vhost'], false, false, true, true);
    $this->ch->basic_qos(0 , 20 , 0); // fair dispatching

    $this->ch->queue_declare(
        $this->defaults['queue']['name'],
        $this->defaults['queue']['passive'],
        $this->defaults['queue']['durable'],
        $this->defaults['queue']['exclusive'],
        $this->defaults['queue']['auto_delete']
    );

    $this->ch->exchange_declare(
        $this->defaults['exchange']['name'],
        $this->defaults['exchange']['type'],
        $this->defaults['exchange']['passive'],
        $this->defaults['exchange']['durable'],
        $this->defaults['exchange']['auto_delete']
    );

    $this->ch->queue_bind(
        $this->defaults['queue']['name'],
        $this->defaults['exchange']['name'],
        $this->defaults['binding']['routing_key']
    );
}

Any help will be appreciated.

like image 966
Dharmanshu Kamra Avatar asked Dec 06 '22 08:12

Dharmanshu Kamra


1 Answers

Make sure you have added virtualhost access for your user on Rabbit MQ. I've created new user and forgot set access rights for "/" host which is used by default.

You can do that via management panel yourhost:15672 > Admin > click on user > Look for "Set permission".

P.S. I assume your RabbitMQ service is running, user exists and password is correct.

like image 187
Pawka Avatar answered Dec 18 '22 00:12

Pawka