Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Socket.io, cluster, express and sync events

I have a big problem sice 1 week. I try to convert my node.JS project actually run on single core to multi core with cluster.

With websockets, at this moment, i have no problems for events but, for xhr-polling or jsonp-polling, i have big problems with socket.io on cluster mode.

this is my server configuration :

00-generic.js

'use strict';

var http            = require('http'),
    os              = require('os'),
    cluster         = require('cluster');

module.exports = function(done) {
    var app = this.express,
        port = process.env.PORT || 3000,
        address = '0.0.0.0';

    if(this.env == 'test'){
        port = 3030;
    }

    var self = this;
    var size = os.cpus().length;

    if (cluster.isMaster) {
        console.info('Creating HTTP server cluster with %d workers', size);

        for (var i = 0; i < size; ++i) {
            console.log('spawning worker process %d', (i + 1));
            cluster.fork();
        }

        cluster.on('fork', function(worker) {
            console.log('worker %s spawned', worker.id);
        });
        cluster.on('online', function(worker) {
            console.log('worker %s online', worker.id);
        });
        cluster.on('listening', function(worker, addr) {
            console.log('worker %s listening on %s:%d', worker.id, addr.address, addr.port);
        });
        cluster.on('disconnect', function(worker) {
            console.log('worker %s disconnected', worker.id);
        });
        cluster.on('exit', function(worker, code, signal) {
            console.log('worker %s died (%s)', worker.id, signal || code);
            if (!worker.suicide) {
                console.log('restarting worker');
                cluster.fork();
            }
        });
    } else {
        http.createServer(app).listen(port, address, function() {
            var addr = this.address();
            console.log('listening on %s:%d', addr.address, addr.port);
            self.server = this;
            done();
        });
    }
};

03-socket.io.js

"use strict";
var _               = require('underscore'),
    socketio        = require('socket.io'),
    locomotive      = require('locomotive'),
    RedisStore      = require("socket.io/lib/stores/redis"),
    redis           = require("socket.io/node_modules/redis"),
    v1              = require(__dirname + '/../app/socket.io/v1'),
    sockets         = require(__dirname + '/../../app/socket/socket'),
    config          = require(__dirname + '/../app/global'),
    cluster         = require('cluster');

module.exports = function () {
    if (!cluster.isMaster) {
        this.io = socketio.listen(this.server);

        var pub             = redis.createClient(),
            sub             = redis.createClient(),
            client          = redis.createClient();

        this.io.enable('browser client minification');  // send minified client
        this.io.enable('browser client etag');          // apply etag caching logic based on version number
        this.io.enable('browser client gzip');          // gzip the file

        this.io.set("store", new RedisStore({
            redisPub        : pub,
            redisSub        : sub,
            redisClient     : client
        }));
        this.io.set('log level', 2);
        this.io.set('transports', [
            'websocket',
            'jsonp-polling'
        ]);
        this.io.set('close timeout', 24*60*60);
        this.io.set('heartbeat timeout', 24*60*60);

        this.io.sockets.on('connection', function (socket) {
            console.log('connected with ' + this.io.transports[socket.id].name);

            // partie v1 @deprecated
            v1.events(socket);

            // partie v1.1 refaite
            _.each(sockets['1.1'], function(Mod) {
                var mod = new Mod();
                mod.launch({
                    socket  : socket,
                    io      : this.io
                });
            }, this);

        }.bind(this));
    }
};

With polling, the client connects from time to time on a different process than that initiated listeners. Similarly, the communication server to the client with emit.

With a little searching, I found it necessary to pass by a store for socket.io to share the data connection. So I built RedisStore socket.io as shown in the documentation but even with that, I find myself with events not arriving safely and I still get this error message:

warn: client not handshaken client should reconnect

EDIT

Now, the warn error is not called. I change the redisStore to socket.io-clusterhub BUT now, events are not always called. Sometimes as if the polling request was captured by another worker than that which began the listeners and so it nothing happens. Here is the new configuration:

'use strict';

var http            = require('http'),
    locomotive      = require('locomotive'),
    os              = require('os'),
    cluster         = require('cluster'),
    config          = require(__dirname + '/../app/global'),
    _               = require('underscore'),
    socketio        = require('socket.io'),
    v1              = require(__dirname + '/../app/socket.io/v1'),
    sockets         = require(__dirname + '/../../app/socket/socket');

module.exports = function(done) {
    var app = this.express,
        port = process.env.PORT || 3000,
        address = '0.0.0.0';

    if(this.env == 'test'){
        port = 3030;
    }

    var self = this;
    var size = os.cpus().length;

    this.clusterStore = new (require('socket.io-clusterhub'));

    if (cluster.isMaster) {
        for (var i = 0; i < size; ++i) {
            console.log('spawning worker process %d', (i + 1));
            cluster.fork();
        }

        cluster.on('fork', function(worker) {
            console.log('worker %s spawned', worker.id);
        });
        cluster.on('online', function(worker) {
            console.log('worker %s online', worker.id);
        });
        cluster.on('listening', function(worker, addr) {
            console.log('worker %s listening on %s:%d', worker.id, addr.address, addr.port);
        });
        cluster.on('disconnect', function(worker) {
            console.log('worker %s disconnected', worker.id);
        });
        cluster.on('exit', function(worker, code, signal) {
            console.log('worker %s died (%s)', worker.id, signal || code);
            if (!worker.suicide) {
                console.log('restarting worker');
                cluster.fork();
            }
        });
    } else {
        var server = http.createServer(app);

        this.io = socketio.listen(server);

        this.io.configure(function() {
            this.io.enable('browser client minification');  // send minified client
            this.io.enable('browser client etag');          // apply etag caching logic based on version number
            this.io.enable('browser client gzip');          // gzip the file

            this.io.set('store', this.clusterStore);
            this.io.set('log level', 2);
            this.io.set('transports', [
                'websocket',
                'jsonp-polling'
            ]);
            //this.io.set('close timeout', 24*60*60);
            //this.io.set('heartbeat timeout', 24*60*60);
        }.bind(this));

        this.io.sockets.on('connection', function (socket) {
            console.log('connected with ' + this.io.transports[socket.id].name);
            console.log('connected to worker: ' + cluster.worker.id);

            // partie v1 @deprecated
            v1.events(socket);

            // partie v1.1 refaite
            _.each(sockets['1.1'], function(Mod) {
                var mod = new Mod();
                mod.launch({
                    socket  : socket,
                    io      : this.io
                });
            }, this);

        }.bind(this));

        server.listen(port, address, function() {
            var addr = this.address();
            console.log('listening on %s:%d', addr.address, addr.port);
            self.server = this;
            done();
        });
    }
};
like image 711
throrin19 Avatar asked May 27 '14 09:05

throrin19


2 Answers

From that source : http://socket.io/docs/using-multiple-nodes/

If you plan to distribute the load of connections among different processes or machines, you have to make sure that requests associated with a particular session id connect to the process that originated them.

This is due to certain transports like XHR Polling or JSONP Polling relying on firing several requests during the lifetime of the “socket”.

To route connections to the same worker every time :

sticky-session

This is, in the socket.io documentation, the recommended way to route requests to the same worker every time.

https://github.com/indutny/sticky-session

A simple performant way to use socket.io with a cluster.

Socket.io is doing multiple requests to perform handshake and establish connection with a client. With a cluster those requests may arrive to different workers, which will break handshake protocol.

var sticky = require('sticky-sesion');

sticky(function() {
  // This code will be executed only in slave workers

  var http = require('http'),
      io = require('socket.io');

  var server = http.createServer(function(req, res) {
    // ....
  });
  io.listen(server);

  return server;
}).listen(3000, function() {
  console.log('server started on 3000 port');
});

To pass messages between nodes :

socket.io-redis

This is, in socket.io documentation, the recommended way to share messages between workers.

https://github.com/automattic/socket.io-redis

By running socket.io with the socket.io-redis adapter you can run multiple socket.io instances in different processes or servers that can all broadcast and emit events to and from each other.

socket.io-redis is used this way :

var io = require('socket.io')(3000);
var redis = require('socket.io-redis');
io.adapter(redis({ host: 'localhost', port: 6379 }));

Also

I think you are not using socket.io v1.0.0. You might want to update your version in order to get more stability.

You can check their migration guide at http://socket.io/docs/migrating-from-0-9/

like image 63
Ludovic C Avatar answered Sep 21 '22 06:09

Ludovic C


There is a step missing from the socket.io docs when using

var io = require('socket.io')(3000);
var redis = require('socket.io-redis');
io.adapter(redis({ host: 'localhost', port: 6379 }));

You need to tell the client that you want to use 'websockets' as the only form of transport or it will not work... so for the constructor on the client use

io.connect(yourURL , { transports : ['websocket']});

see my answer to a similar question here ( my answer might be more appropriate on this thread ): https://stackoverflow.com/a/30791006/4127352

like image 31
Squivo Avatar answered Sep 23 '22 06:09

Squivo