Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Trying to make my own RxJs observable

Tags:

node.js

rxjs

I'm trying to convert an existing API to work with RxJS... fairly new to node, and very new to RxJs, so please bear with me.

I have an existing API (getNextMessage), that either blocks (asynchronously), or returns a new item or error via a node-style (err, val) callback, when the something becomes available.

so it looks something like:

getNextMessage(nodeStyleCompletionCallback);

You could think of getNextMessage like an http request, that completes in the future, when the server responds, but you do need to call getNextMessage again, once a message is received, to keep getting new items from the server.

So, in order to make it into an observable collection, I have to get RxJs to keep calling my getNextMessage function until the subscriber is disposed();

Basically, I'm trying to create my own RxJs observable collection.

The problems are:

  1. I don't know how to make subscriber.dispose() kill the async.forever
  2. I probably shouldn't be using async.forever in the first place
  3. I'm not sure I should be even getting 'completed' for each message - shouldn't that be at the end of a sequence
  4. I'd like to eventually remove the need for using fromNodeCallback, to have a first class RxJS observable
  5. Clearly I'm a little confused.

Would love a bit of help, thanks!

Here is my existing code:

var Rx = require('rx');
var port = require('../lib/port');
var async = require('async');

function observableReceive(portName)
{
    var observerCallback;
    var listenPort = new port(portName);
    var disposed = false;

    var asyncReceive = function(asyncCallback)
    {
        listenPort.getNextMessage(
            function(error, json)
            {
                observerCallback(error, json);

                if (!disposed)
                    setImmediate(asyncCallback);
            }
        );
    }

    return function(outerCallback)
    {
        observerCallback = outerCallback;
        async.forever(asyncReceive);
    }
}

var receive = Rx.Observable.fromNodeCallback(observableReceive('rxtest'));
var source = receive();

var subscription = source.forEach(
    function (json)
    {
        console.log('receive completed: ' + JSON.stringify(json));
    },
    function (error) {
        console.log("receive failed: " + error.toString());
    },
    function () {
        console.log('Completed');
        subscription.dispose();
    }
);
like image 551
user3291110 Avatar asked Feb 10 '14 01:02

user3291110


Video Answer


1 Answers

So here's probably what I would do.

var Rx = require('Rx');

// This is just for kicks. You have your own getNextMessage to use. ;)
var getNextMessage = (function(){

  var i = 1;

  return function (callback) {
    setTimeout(function () {
      if (i > 10) {
        callback("lawdy lawd it's ova' ten, ya'll.");
      } else {
        callback(undefined, i++);
      }
    }, 5);
  };

}());

// This just makes an observable version of getNextMessage.
var nextMessageAsObservable = Rx.Observable.create(function (o) {
  getNextMessage(function (err, val) {
    if (err) {
      o.onError(err);
    } else {
      o.onNext(val);
      o.onCompleted(); 
    }
  });
});

// This repeats the call to getNextMessage as many times (11) as you want.
// "take" will cancel the subscription after receiving 11 items.
nextMessageAsObservable
  .repeat()
  .take(11)
  .subscribe(
    function (x)   { console.log('next', x);    },
    function (err) { console.log('error', err); },
    function ()    { console.log('done');       }
  );
like image 82
cwharris Avatar answered Sep 28 '22 12:09

cwharris