Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Unsubscribe from RxJS Observables

I have these two objects, and I want to stop listening to their events. I am totally new to observables and RxJS and just trying to work with the Inquirer library.

Here is the RxJS API for reference: http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html

How can I unsubscribe from these types of observables?

ConnectableObservable:

   ConnectableObservable {
     source: EventPatternObservable { _add: [Function], _del: [Function], _fn: undefined },
     _connection: ConnectDisposable { _p: [Circular], _s: [Object] },
     _source: AnonymousObservable { source: [Object], __subscribe: [Function: subscribe] },
     _subject: 
      Subject {
        isDisposed: false,
        isStopped: false,
        observers: [Object],
        hasError: false } },
  _count: 1,
  _connectableSubscription: 
   ConnectDisposable {
     _p: 
      ConnectableObservable {
        source: [Object],
        _connection: [Circular],
        _source: [Object],
        _subject: [Object] },
     _s: AutoDetachObserver { isStopped: false, observer: [Object], m: [Object] } } }

FilterObservable:

FilterObservable {
  source: 
   RefCountObservable {
     source: 
      ConnectableObservable {
        source: [Object],
        _connection: [Object],
        _source: [Object],
        _subject: [Object] },
     _count: 1,
     _connectableSubscription: ConnectDisposable { _p: [Object], _s: [Object] } },
  predicate: [Function] }

I need to unsubscribe from these objects:

'use strict';
var rx = require('rx');

function normalizeKeypressEvents(value, key) {
  return {value: value, key: key || {}};
}

module.exports = function (rl) {

  var keypress = rx.Observable.fromEvent(rl.input, 'keypress', normalizeKeypressEvents)
    .filter(function (e) {
      // Ignore `enter` key. On the readline, we only care about the `line` event.
      return e.key.name !== 'enter' && e.key.name !== 'return';
    });

  return {
    line: rx.Observable.fromEvent(rl, 'line'),

    keypress: keypress,

    normalizedLeftKey: keypress.filter(function (e) {
      return e.key.name === 'left';
    }).share(),

    normalizedRightKey: keypress.filter(function (e) {
      return e.key.name === 'right';
    }).share(),

    normalizedUpKey: keypress.filter(function (e) {
      return e.key.name === 'up' || e.key.name === 'k' || (e.key.name === 'p' && e.key.ctrl);
    }).share(),

    normalizedDownKey: keypress.filter(function (e) {
      return e.key.name === 'down' || e.key.name === 'j' || (e.key.name === 'n' && e.key.ctrl);
    }).share(),

    numberKey: keypress.filter(function (e) {
      return e.value && '123456789'.indexOf(e.value) >= 0;
    }).map(function (e) {
      return Number(e.value);
    }).share(),

    spaceKey: keypress.filter(function (e) {
      return e.key && e.key.name === 'space';
    }).share(),

    aKey: keypress.filter(function (e) {
      return e.key && e.key.name === 'a';
    }).share(),

    iKey: keypress.filter(function (e) {
      return e.key && e.key.name === 'i';
    }).share()
  };
};

My current best guess is that no explicit call to subscribe is happening like this:

var source = Rx.Observable.fromEvent(input, 'click');

var subscription = source.subscribe(
  function (x) {
    console.log('Next: Clicked!');
  },
  function (err) {
    console.log('Error: %s', err);
  },
  function () {
    console.log('Completed');
  });

but instead, there are these calls:

events.normalizedUpKey.takeUntil(validation.success).forEach(this.onUpKey.bind(this));
events.normalizedDownKey.takeUntil(validation.success).forEach(this.onDownKey.bind(this));

so my best guess is that I need a way to nullify/cancel the takeUntil call.

like image 546
Alexander Mills Avatar asked Nov 12 '16 05:11

Alexander Mills


People also ask

Do you have to unsubscribe from observables?

In Angular applications, it's always recommended to unsubscribe the observables to gain benefits like: Avoids Memory Leaks. Aborting HTTP requests to avoid unwanted calls.

Do we need to unsubscribe RxJS?

Apparently, you have to unsubscribe if you want to avoid memory leaks. But NOT doing so doesn't always result in a memory leak.

What happens if you don't unsubscribe from an Observable?

As you probably know when you subscribe to an observable or event in JavaScript, you usually need to unsubscribe at a certain point to release memory in the system. Otherwise, you will have a memory leak.


2 Answers

If you want to unsubscribe you need to have the Subscription object. That's the object returned from every Observable.subscribe() call. For example:

let subscriber = Observable.subscribe(...);
...
subscriber.unsubscribe();

For more info see: https://github.com/ReactiveX/rxjs/blob/master/doc/subscription.md

like image 142
martin Avatar answered Sep 23 '22 01:09

martin


I second what the first commenter was saying. However it feels that there needs to be a call like this somewhere in the code:

let subscription = normalizedUpKey.subscribe( data => console.log('data') );

that you could do

subscription.unsubscribe()

on. How else would you know when something happens, or is that part of the 3rd party library?

To read more about Rxjs. I suggest you have a look at this free book, https://www.gitbook.com/book/chrisnoring/rxjs-5-ultimate/details

like image 24
Chris Noring Avatar answered Sep 22 '22 01:09

Chris Noring