Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Node.js TCP server incoming buffer

I have two node processes that speak to each other. I will call them [Node Server] and [Node Sender]. [Node Sender] continually processes information and writes a message over a TCP connection to [Node Server]. [Node Server] then writes back a status message.

Example of [Node Sender]:

var message = "Test Message";
[Node Sender].connection.write(message);

Example of [Node Server]:

[Node Server].socket.on("data", function(p_data) {
    this.write("OK");

    // Do some work with p_data
}

This works without issue, p_data always contains "Test Message" when sent at anything above 5 milliseconds. However, if I speed [Node Sender] to write every millisecond, p_data occasionally ends up with something like "Test MessageTest MessageTes".

I understand that the buffer in [Node Sender] is probably filling faster than the write command is sending it. Is there a way to force a one-to-one ratio in sending messages while still remaining asynchronous?

I can certainly just add a terminator to my message and just fill a buffer in [Node Server], but I wanted to make sure there wasn't something obvious I was missing.

like image 652
user747454 Avatar asked May 10 '11 19:05

user747454


2 Answers

No, you're not missing anything and yes, you do need to add some form of termination to your messages.

There are two basic problems here:

  1. The TCP protocol is stream-oriented, not message-oriented; it has no intrinsic knowledge of what might constitute a "message".

  2. The data event fired by the node.js net library indicates that some data has arrived but without any idea of what a message might contain, it cannot indicate that it has received any specific data.

So by sending your messages faster than Node can process them, the socket recv buffer fills with multiple "messages".

A typical solution to this problem is to add line-termination, as can be found in https://github.com/baudehlo/Haraka/blob/master/connection.js at lines 32-34:

self.client.on('data', function (data) {
    self.process_data(data);
});

and lines 110-134:

Connection.prototype.process_data = function (data) {
  if (this.disconnected) {
    logger.logwarn("data after disconnect from " + this.remote_ip);
    return;
  }

  this.current_data += data;
  this._process_data();
};

Connection.prototype._process_data = function() {
  var results;
  while (results = line_regexp.exec(this.current_data)) {
    var this_line = results[1];
    if (this.state === 'pause') {
        this.early_talker = 1;
        var self = this;
        // If you talk early, we're going to give you a delay
        setTimeout(function() { self._process_data() }, this.early_talker_delay);
        break;
    }
    this.current_data = this.current_data.slice(this_line.length);
    this.process_line(this_line);
  }
};
like image 98
Rob Raisch Avatar answered Sep 23 '22 12:09

Rob Raisch


You need to accumulate incoming buffer data to get your complete message. please refer to below example. this server expects data with 4 byte header and message body. header is unsigned int which means total length of body and the body is string data with delimiter '|'. please note that it is possible that this 'header and message' is not being received at a time. so we need to accumulate incoming data until we get a full length of data. And it is also possible that multiple 'header and message' is being received at a time. The point is that we need to data accumulation.

var SERVER_PORT = 8124;
var TCP_DELIMITER = '|';
var packetHeaderLen = 4; // 32 bit integer --> 4

var server = net.createServer( function(c) {
    var accumulatingBuffer = new Buffer(0); 
    var totalPacketLen   = -1; 
    var accumulatingLen  =  0;
    var recvedThisTimeLen=  0;
    var remoteAddress = c.remoteAddress;
    var address= c.address();
    var remotePort= c.remotePort;
    var remoteIpPort = remoteAddress +":"+ remotePort;

    console.log('-------------------------------'+remoteAddress);
    console.log('remoteIpPort='+ remoteIpPort); 

    c.on('data', function(data) {
        console.log('received data length :' + data.length ); 
        console.log('data='+ data); 

        recvedThisTimeLen = data.length;
        console.log('recvedThisTimeLen='+ recvedThisTimeLen);

        //accumulate incoming data
        var tmpBuffer = new Buffer( accumulatingLen + recvedThisTimeLen );
        accumulatingBuffer.copy(tmpBuffer);
        data.copy ( tmpBuffer, accumulatingLen  ); // offset for accumulating
        accumulatingBuffer = tmpBuffer; 
        tmpBuffer = null;
        accumulatingLen += recvedThisTimeLen ;
        console.log('accumulatingBuffer = ' + accumulatingBuffer  ); 
        console.log('accumulatingLen    =' + accumulatingLen );

        if( recvedThisTimeLen < packetHeaderLen ) {
            console.log('need to get more data(less than header-length received) -> wait..');
            return;
        } else if( recvedThisTimeLen == packetHeaderLen ) {
            console.log('need to get more data(only header-info is available) -> wait..');
            return;
        } else {
            console.log('before-totalPacketLen=' + totalPacketLen ); 
            //a packet info is available..
            if( totalPacketLen < 0 ) {
                totalPacketLen = accumulatingBuffer.readUInt32BE(0) ; 
                console.log('totalPacketLen=' + totalPacketLen );
            }
        }    

        //while=> 
        //in case of the accumulatingBuffer has multiple 'header and message'.
        while( accumulatingLen >= totalPacketLen + packetHeaderLen ) {
            console.log( 'accumulatingBuffer= ' + accumulatingBuffer );

            var aPacketBufExceptHeader = new Buffer( totalPacketLen  ); // a whole packet is available...
            console.log( 'aPacketBufExceptHeader len= ' + aPacketBufExceptHeader.length );
            accumulatingBuffer.copy( aPacketBufExceptHeader, 0, packetHeaderLen, accumulatingBuffer.length); // 

            ////////////////////////////////////////////////////////////////////
            //process one packet data
            var stringData = aPacketBufExceptHeader.toString();
            var usage = stringData.substring(0,stringData.indexOf(TCP_DELIMITER));
            console.log("usage: " + usage);
            //call handler
            (serverFunctions [usage])(c, remoteIpPort, stringData.substring(1+stringData.indexOf(TCP_DELIMITER)));
            ////////////////////////////////////////////////////////////////////

            //rebuild buffer
            var newBufRebuild = new Buffer( accumulatingBuffer.length );
            newBufRebuild.fill();
            accumulatingBuffer.copy( newBufRebuild, 0, totalPacketLen + packetHeaderLen, accumulatingBuffer.length  );

            //init      
            accumulatingLen -= (totalPacketLen +4) ;
            accumulatingBuffer = newBufRebuild;
            newBufRebuild = null;
            totalPacketLen = -1;
            console.log( 'Init: accumulatingBuffer= ' + accumulatingBuffer );   
            console.log( '      accumulatingLen   = ' + accumulatingLen );  

            if( accumulatingLen <= packetHeaderLen ) {
                return;
            } else {
                totalPacketLen = accumulatingBuffer.readUInt32BE(0) ; 
                console.log('totalPacketLen=' + totalPacketLen );
            }    
        }  
    }); 

    ...
});

please refer to below for whole example.

https://github.com/jeremyko/nodeChatServer

Hope this help.

like image 35
jeremyko Avatar answered Sep 19 '22 12:09

jeremyko