Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to properly turn WS Server to RXJS Api without Subjects in NodeJs

What's the proper way to turn the famous ws module into a reactive api in Node.js? I understand that subjects can help with bridging non-reactive to reactive events, but their the problem is that they have a much harder time disposing their dependent objects.

var WebSocketServer = require('ws').Server;
var wss = new WebSocketServer({ port: 8080 });
var Rx = require('rx');  


var connectionMessageSubject = new Rx.Subject();

wss.on('connection', function connection(client) {
  ws.on('message', function incoming(message) {
    connectionMessageSubject.onNext({
      client: client,
      message: message
    });
  });
});

I can't use their built in fromEvent method because, it registers so many different events that NodeJS throws a warning when 30 or more clients connect.

For example...

var WebSocketServer = require('ws').Server;
var wss = new WebSocketServer({port:8080});

var connectionMessageObservable;

//this uses a tremendous amount of memory and throws warnings that the event emitter has a maximum of 30 listeners 
wss.on('connection', function connection(client){
  connnectionMessageObservable = Rx.Observable.fromEvent(client, 'message');
});
like image 280
Max Alexander Avatar asked Feb 19 '15 02:02

Max Alexander


People also ask

Can we use RxJS in Nodejs?

Today we will look at how we can add RxJs to our Node. js applications using Typescript. We will also look at the various operators found in the library. To begin with, RxJs makes use of observables to observe and emit data.

Can RxJS be used server side?

RxJS is JavaScript library for transforming, composing and querying asynchronous streams of data. RxJS can be used both in the browser or in the server-side using Node.


1 Answers

The following code simulates the subject behavior.

var WebSocketServer = require('ws').Server;
var wss = new WebSocketServer({port:8080});

var connectionMessage$ = new Rx.Observable(function (observer) {
    wss.on('connection', function connection(client){
        client.on('message', function (message){
            observer.next({
                client: client,
                message: message,
            })
        });
    });    
});

connectionMessage$.subscribe(function (cm) {
    // cm.client for client
    // cm.message for message
});
like image 60
Rafael Kallis Avatar answered Sep 29 '22 16:09

Rafael Kallis