Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Extend RxJS Observable class with operators


How can Observable class be extended by applying built-in RxJS operators to it?

I would like to do something like this:

class TruthyObservable extends Observable {
  constructor(subscriber) {

    return this.filter(x => x);

class TruthyMappedObservable extends TruthyObservable {
  constructor(subscriber) {

    return this.map(x => `'${x}'`);

Can this be done without constructor return?

like image 330
Estus Flask Avatar asked Dec 28 '16 17:12

Estus Flask

1 Answers

This pretty much depends on what you want to do but let's say you want to make a TruthyObservable that behaves very much like the default Observable.create(...) but passes only even numbers:

import { Observable, Observer, Subscriber, Subject, Subscription } from 'rxjs';
import 'rxjs/add/operator/filter';

class TruthyObservable<T> extends Observable<T> {

    constructor(subscribe?: <R>(this: Observable<T>, subscriber: Subscriber<R>) => any) {
        if (subscribe) {
            let oldSubscribe = subscribe;
            subscribe = (obs: Subscriber<any>) => {
                obs = this.appendOperators(obs);
                return oldSubscribe.call(this, obs);


    private appendOperators(obs: Subscriber<any>) {
        let subject = new Subject();

            .filter((val: number) => val % 2 == 0)

        return new Subscriber(subject);


let o = new TruthyObservable<number>((obs: Observer<number>) => {

o.subscribe(val => console.log(val));

This prints to console:


See live demo: https://jsbin.com/recuto/3/edit?js,console

Usually classes inheriting Observable override the _subscribe() method that actually makes the subscription internally but in ours case we want to use the callback where we can emit values by ourselves (since this Observable doesn't emit anything itself). Method _subscribe() is overshadowed by _subscribe property if it exists so we wouldn't be able to append any operators to it if we just overrode this method. That's why I wrap _subscribe in the constructor with another function and then pass all values through a Subject chained with filter() in appendOperators() method. Note that I replaced the original Observer with the Subject at obs = this.appendOperators(obs).

At the end when I call eg. obs.next(3); I'm in fact pushing values to the Subject that filters them and passes them to the original Observer.

like image 123
martin Avatar answered Sep 26 '22 16:09
