Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

amqp.node won't detect a connection drop

We have a node.js script running a socket.io server whose clients consume messages from a RabbitMQ queue. We've recently migrated to Amazon AWS and RabbitMQ is now a cluster of two machines (redundant instances). The AMQP connection is lost from time to time (it is a limitation that arrives from a high availability environment with redundant VMs and we have to cope with it) and if an attempt to reconnect is made, the DNS chooses which instance to connect to (it is a cluster with data replication so it doesn't matter which instance to connect to).

The problem is that the attempt to reconnect is never made; after a while, when the connection is lost, amqp.node apparently fails to notice that the connection has been lost. Also, the consumers stop receiving messages and the socket.io server simply stops accepting new connections.

We have a 55 seconds heartbeat timeout (not to be confused with the socket.io heartbeat timeout) set at the RabbitMQ URL and are checking for 'error' and 'close' events with amqp.node's callback API but they are apparently never issued. The queues expect the consumed messages to be ack'ed. We want the node script to detect a lost connection and finish itself, so the environment will automatically start a new process and establish a connection again.

Here is the code, maybe we are doing something wrong with the amqp.node callback API or something else.

var express = require('express');
app = express();
var http = require('http');
var serverio = http.createServer(app);
var io = require('socket.io').listen(serverio, { log: false });
var socket;
var allcli = [];
var red, blue, green, magenta, reset;
red   = '\033[31m';
blue  = '\033[34m';
green  = '\033[32m';
magenta  = '\033[35m';
orange = '\033[43m';
reset = '\033[0m';

var queue = 'ha.atualizacao_mobile';
var urlRabbit = 'amqp://login:password@host?heartbeat=55' // Amazon
var amqp = require('amqplib/callback_api');
var debug = true;

console.log("Original Socket.IO heartbeat interval: " + io.get('heartbeat interval') + " seconds.");
io.set('heartbeat interval', 10 * 60);
console.log("Hearbeat interval changed to " + io.get('heartbeat interval') + " seconds to reduce battery consumption in the mobile clients.");

console.log("Original Socket.IO heartbeat timeout: " + io.get('heartbeat timeout') + " seconds.");
io.set('heartbeat timeout', 11 * 60);
console.log("Heartbeat timeout set to " + io.get('heartbeat timeout') + " seconds.");


io.sockets.on('connection', function(socket){

    socket.on('error', function (exc) {
        console.log(orange+"Ignoring exception: " + exc + reset);
    });

    socket.on('send-indice', function (data) {
        // Some business logic
    });

    socket.on('disconnect', function () {
        // Some business logic
    });

}); 

function updatecli(data){
    // Some business logic
}

amqp.connect(urlRabbit, null, function(err, conn) {
    if (err !== null) {
        return console.log("Error creating connection: " + err);
    }

    conn.on('error', function(err) {
        console.log("Generated event 'error': " + err);
    });

    conn.on('close', function() {
        console.log("Connection closed.");
        process.exit();
    });

    processRabbitConnection(conn, function() {
        conn.close();
    });
});

function processRabbitConnection(conn, finalize) {
    conn.createChannel(function(err, channel) {

        if (err != null) {
            console.log("Error creating channel: " + err);
            return finalize();
        }

        channel.assertQueue(queue, null, function(err, ok) {
            if (err !== null) {
                    console.log("Error asserting queue " + queue + ": " + err);
                    return finalize();
            }

            channel.consume(queue, function (msg) {
                if (msg !== null) {
                    try {
                        var dataObj = JSON.parse(msg.content);
                        if (debug == true) {
                            //console.log(dataObj);
                        }
                        updatecli(dataObj);
                    } catch(err) {
                        console.log("Error in JSON: " + err);
                    }
                    channel.ack(msg);
                }
            }, null, function(err, ok) {
                if (err !== null) {
                    console.log("Error consuming message: " + err);
                    return finalize();
                }
            });
        });
    });
}

serverio.listen(9128, function () {
  console.log('Server: Socket IO Online  - Port: 9128 - ' + new Date());
});
like image 360
Piovezan Avatar asked Jan 05 '15 11:01

Piovezan


People also ask

How do I check Rabbitmq connection?

Verify Server Configuration Here are the recommended steps: Make sure the node is running using rabbitmq-diagnostics status. Verify config file is correctly placed and has correct syntax/structure. Inspect listeners using rabbitmq-diagnostics listeners or the listeners section in rabbitmq-diagnostics status.

Why is Rabbitmq connection closed?

A common cause for connections being abruptly closed as soon as they're started is a TCP load balancer's heartbeat. If this is the case you should see these messages at very regular intervals, and the generally accepted practice seems to be to ignore them.

How many connections can Rabbitmq handle?

Below is the default TCP socket option configuration used by RabbitMQ: TCP connection backlog is limited to 128 connections.


1 Answers

Apparently the issue has been solved. The near 60 seconds heartbeat was the issue. It conflicts with the RabbitMQ load balancer which checks every 1 minute or so whether data has passed through the connection or not (if no data has passed, it breaks the connection). The AMQP connection stops receiving messages and the library apparently doesn't react to that. A lower heartbeat (e.g. 30 seconds) is necessary in order to avoid this situation.

like image 68
Piovezan Avatar answered Oct 20 '22 09:10

Piovezan