Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I create an observable from the onmessage callback?

I am fairly used to RX having used it in .NET and Java, I am expecting to be able to do the following:

Rx.Observable.fromCallback(websocket.onmessage)
    .map(...)
    .subscribe(...);

however, the console has the following:

Uncaught TypeError: Rx.Observable.fromCallback(websocket.onmessage).map is not a function

which would appear to indicate that the fromCallback is not returning an Observable.

What am I doing wrong here? Have I misunderstood what fromCallback is doing and I need to use a Subject? Can I not wrap some arbitrary handler in an observable?

like image 215
Cheetah Avatar asked Dec 24 '22 15:12

Cheetah


1 Answers

You are actually looking for fromEvent or fromEventPattern:

Rx.Observable.fromEvent(websocket, 'message').map(/*...*/).subscribe();

Rx.Observable.fromEventPattern(
  function add(h) { websocket.addEventListener(h); }, 
  function remove(h) { websocket.removeEventListener(h); })
 .map(/*...*/)
 .subscribe();

The first one will attempt to use some of the standard ways of subscribing to an event emitter, which WebSocket is. However, if that fails you can use fromEventPattern instead to specify how handlers are added or removed from your object.

One additional note, JavaScript does not pass along an implicit reference to the instance of the object you are using as C# and Java do, so your code fromCallback(websocket.onmessage) is not passing along websocket, it is passing along the reference to the method from the function prototype. this will be determined at execution time.

Rx.Observable.fromCallback is for functions whose last argument is a callback function which is a standard pattern for asynchronous JavaScript code. Further, the fromCallback method does not return an Observable it returns a function that when called returns an Observable i.e.

function methodWithCallback(arg0, arg1, cb) {
  setTimeout(function() {
    cb(arg0 + arg1);
  }, 2000);
}

var newMethod = Rx.Observable.fromCallback(methodWithCallback);

//[After 2 seconds] 3
newMethod(1, 2).subscribe(console.log.bind(console));
like image 76
paulpdaniels Avatar answered Dec 27 '22 05:12

paulpdaniels