Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Observable subject event listener

I was looking into Observables and their differences to EventEmitter and then stumbled upon Subjects ( which I can see Angulars EventEmitter is based off ).

It seems Observables are unicast vs Subjects that are multicast ( and then an EE is simply a subject that wraps .next in emit to give the correct interface ).

Observables seem easy enough to implement

class Observable {
    constructor(subscribe) {
        this._subscribe = subscribe;
    }

    subscribe(next, complete, error) {
        const observer = new Observer(next, complete, error);

        // return way to unsubscribe
        return this._subscribe(observer);
    }

}

Where Observer is just a wrapper that adds some try catches and monitors isComplete so it can clean up and stop observing.

For a Subject I came up with:

class Subject {
    subscribers = new Set();

    constructor() {
        this.observable = new Observable(observer => {
            this.observer = observer;
        });

        this.observable.subscribe((...args) => {
            this.subscribers.forEach(sub => sub(...args))
        });
    }

    subscribe(subscriber) {
        this.subscribers.add(subscriber);
    }

    emit(...args) {
        this.observer.next(...args);
    }
}

which sort of merges into an EventEmitter with it wrapping .next with emit - but capturing the observe argument of the Observable seems wrong - and like I have just hacked up a solution. What would be the better way to produce a Subject (multicast) from an Observable (unicast)?

I tried looking at RXJS but I can't see how it's subscribers array ever gets populated :/

like image 959
CWright Avatar asked Sep 29 '20 09:09

CWright


1 Answers

I think you can have a better understanding by using the debugger as well. Open a StackBlitz RxJS project, create the simplest example(depending on what you're trying to understand) and then place some breakpoints. AFAIK, with StackBlitz you can debug the TypeScript files, which seems great.


Firstly, the Subject class extends Observable:

export class Subject<T> extends Observable<T> implements SubscriptionLike { /* ... */ }

Now let's examine the Observable class.

It has the well-known pipe method:

pipe(...operations: OperatorFunction<any, any>[]): Observable<any> {
  return operations.length ? pipeFromArray(operations)(this) : this;
}

where pipeFromArray is defined as follows:

export function pipeFromArray<T, R>(fns: Array<UnaryFunction<T, R>>): UnaryFunction<T, R> {
  if (fns.length === 0) {
    return identity as UnaryFunction<any, any>;
  }

  if (fns.length === 1) {
    return fns[0];
  }

  return function piped(input: T): R {
    return fns.reduce((prev: any, fn: UnaryFunction<T, R>) => fn(prev), input as any);
  };
}

Before clarifying what's going on in the above snippet, it is important to know that operators are. An operator is a function which returns another function whose single argument is an Observable<T> and whose return type is an Observable<R>. Sometimes, T and R can be the same(e.g when using filter(), debounceTime()...).

For example, map is defined like this:

export function map<T, R>(project: (value: T, index: number) => R, thisArg?: any): OperatorFunction<T, R> {
  return operate((source, subscriber) => {
    // The index of the value from the source. Used with projection.
    let index = 0;
    // Subscribe to the source, all errors and completions are sent along
    // to the consumer.
    source.subscribe(
      new OperatorSubscriber(subscriber, (value: T) => {
        // Call the projection function with the appropriate this context,
        // and send the resulting value to the consumer.
        subscriber.next(project.call(thisArg, value, index++));
      })
    );
  });
}

export function operate<T, R>(
  init: (liftedSource: Observable<T>, subscriber: Subscriber<R>) => (() => void) | void
): OperatorFunction<T, R> {
  return (source: Observable<T>) => {
    if (hasLift(source)) {
      return source.lift(function (this: Subscriber<R>, liftedSource: Observable<T>) {
        try {
          return init(liftedSource, this);
        } catch (err) {
          this.error(err);
        }
      });
    }
    throw new TypeError('Unable to lift unknown Observable type');
  };
}

So, operate will return a function. Notice its argument: source: Observable<T>. The return type is derived from Subscriber<R>.

Observable.lift just creates a new Observable. It's like creating nodes within a liked list.

protected lift<R>(operator?: Operator<T, R>): Observable<R> {
  const observable = new Observable<R>();
  
  // it's important to keep track of the source !
  observable.source = this;
  observable.operator = operator;
  return observable;
}

So, an operator(like map) will return a function. What invokes that function is the pipeFromArray function:

export function pipeFromArray<T, R>(fns: Array<UnaryFunction<T, R>>): UnaryFunction<T, R> {
  if (fns.length === 0) {
    return identity as UnaryFunction<any, any>;
  }

  if (fns.length === 1) {
    return fns[0];
  }

  return function piped(input: T): R {
    // here the functions returned by the operators are being called
    return fns.reduce((prev: any, fn: UnaryFunction<T, R>) => fn(prev), input as any);
  };
}

In the above snippet, fn is what the operate function returns:

return (source: Observable<T>) => {
  if (hasLift(source)) { // has `lift` method
    return source.lift(function (this: Subscriber<R>, liftedSource: Observable<T>) {
      try {
        return init(liftedSource, this);
      } catch (err) {
        this.error(err);
      }
    });
  }
  throw new TypeError('Unable to lift unknown Observable type');
};

Maybe it would be better to see an example as well. I'd recommend trying this yourself with a debugger.

const src$ = new Observable(subscriber => {subscriber.next(1), subscriber.complete()});

The subscriber => {} callback fn will be assigned to the Observable._subscribe property.

constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
  if (subscribe) {
    this._subscribe = subscribe;
  }
}

Next, let's try adding an operator:

const src2$ = src$.pipe(map(num => num ** 2))

In this case, it will invoke this block from pipeFromArray:

// `pipeFromArray`
if (fns.length === 1) {
  return fns[0];
}

// `Observable.pipe`
pipe(...operations: OperatorFunction<any, any>[]): Observable<any> {
  return operations.length ? pipeFromArray(operations)(this) : this;
}

So, the Observable.pipe will invoke (source: Observable<T>) => { ... }, where source is the src$ Observable. By invoking that function(whose result is stored in src2$), it will also call the Observable.lift method.

return source.lift(function (this: Subscriber<R>, liftedSource: Observable<T>) {
  try {
    return init(liftedSource, this);
  } catch (err) {
    this.error(err);
  }
});

/* ... */

protected lift<R>(operator?: Operator<T, R>): Observable<R> {
  const observable = new Observable<R>();
  observable.source = this;
  observable.operator = operator;
  return observable;
}

At this point, src$ is an Observable instance, which has the source set to src$ and the operator set to function (this: Subscriber<R>, liftedSource: Observable<T>) ....

From my perspective, it's all about linked lists. When creating the Observable chain(by adding operators), the list is created from top to bottom.
When the tail node has its subscribe method called, another list will be created, this time from bottom to top. I like to call the first one the Observable list and the second one the Subscribers list.

src2$.subscribe(console.log)

This is what happens when the subscribe method is called:

const subscriber = isSubscriber(observerOrNext) ? observerOrNext : new SafeSubscriber(observerOrNext, error, complete);
  
  const { operator, source } = this;
  subscriber.add(
    operator
      ? operator.call(subscriber, source)
      : source || config.useDeprecatedSynchronousErrorHandling
      ? this._subscribe(subscriber)
      : this._trySubscribe(subscriber)
  );

  return subscriber;

In this case src2$ has an operator, so it will call that. operator is defined as:

function (this: Subscriber<R>, liftedSource: Observable<T>) {
  try {
    return init(liftedSource, this);
  } catch (err) {
    this.error(err);
  }
}

where init depends on the operator that is used. Once again, here is map's init

export function map<T, R>(project: (value: T, index: number) => R, thisArg?: any): OperatorFunction<T, R> {
  return operate( /* THIS IS `init()` */(source, subscriber) => {
    
    // The index of the value from the source. Used with projection.
    let index = 0;
    // Subscribe to the source, all errors and completions are sent along
    // to the consumer.
    source.subscribe(
      new OperatorSubscriber(subscriber, (value: T) => {
        // Call the projection function with the appropriate this context,
        // and send the resulting value to the consumer.
        subscriber.next(project.call(thisArg, value, index++));
      })
    );
  });
}

source is in fact src$. When source.subscribe() is called, it will end up calling the callback provided to new Observable(subscriber => { ... }). Calling subscriber.next(1) will call the (value: T) => { ... } from above, which will call subscriber.next(project.call(thisArg, value, index++));(project - the callback provided to map). Lastly, subscriber.next refers to console.log.

Coming back to Subject, this is what happens when the _subscribe method is called:

protected _subscribe(subscriber: Subscriber<T>): Subscription {
  this._throwIfClosed(); // if unsubscribed
  this._checkFinalizedStatuses(subscriber); // `error` or `complete` notifications
  return this._innerSubscribe(subscriber);
}

protected _innerSubscribe(subscriber: Subscriber<any>) {
  const { hasError, isStopped, observers } = this;
  return hasError || isStopped
    ? EMPTY_SUBSCRIPTION
    : (observers.push(subscriber), new Subscription(() => arrRemove(this.observers, subscriber)));
}

So, this is how Subject's list of subscribers are is populated. By returning new Subscription(() => arrRemove(this.observers, subscriber)), it ensures that then subscriber unsubscribes(due to complete/error notifications or simply subscriber.unsubscribe()), the inactive subscriber will be removed from the Subject's list.

like image 75
Andrei Gătej Avatar answered Nov 11 '22 23:11

Andrei Gătej