How can Observable class be extended by applying built-in RxJS operators to it?
I would like to do something like this:
class TruthyObservable extends Observable {
  constructor(subscriber) {
    super(subscriber);
    return this.filter(x => x);
  }
}
class TruthyMappedObservable extends TruthyObservable {
  constructor(subscriber) {
    super(subscriber);
    return this.map(x => `'${x}'`);
  }
}
Can this be done without constructor return?
This pretty much depends on what you want to do but let's say you want to make a TruthyObservable that behaves very much like the default Observable.create(...) but passes only even numbers:
import { Observable, Observer, Subscriber, Subject, Subscription } from 'rxjs';
import 'rxjs/add/operator/filter';
class TruthyObservable<T> extends Observable<T> {
    constructor(subscribe?: <R>(this: Observable<T>, subscriber: Subscriber<R>) => any) {
        if (subscribe) {
            let oldSubscribe = subscribe;
            subscribe = (obs: Subscriber<any>) => {
                obs = this.appendOperators(obs);
                return oldSubscribe.call(this, obs);
            };
        }
        super(subscribe);
    }
    private appendOperators(obs: Subscriber<any>) {
        let subject = new Subject();
        subject
            .filter((val: number) => val % 2 == 0)
            .subscribe(obs);
        return new Subscriber(subject);
    }
}
let o = new TruthyObservable<number>((obs: Observer<number>) => {
    obs.next(3);
    obs.next(6);
    obs.next(7);
    obs.next(8);
});
o.subscribe(val => console.log(val));
This prints to console:
6
8
See live demo: https://jsbin.com/recuto/3/edit?js,console
Usually classes inheriting Observable override the _subscribe() method that actually makes the subscription internally but in ours case we want to use the callback where we can emit values by ourselves (since this Observable doesn't emit anything itself). Method _subscribe() is overshadowed by _subscribe property if it exists so we wouldn't be able to append any operators to it if we just overrode this method. That's why I wrap _subscribe in the constructor with another function and then pass all values through a Subject chained with filter() in appendOperators() method. Note that I replaced the original Observer with the Subject at obs = this.appendOperators(obs).
At the end when I call eg. obs.next(3); I'm in fact pushing values to the Subject that filters them and passes them to the original Observer.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With