I was looking into Observables and their differences to EventEmitter and then stumbled upon Subjects ( which I can see Angulars EventEmitter is based off ).
It seems Observables are unicast vs Subjects that are multicast ( and then an EE is simply a subject that wraps .next in emit to give the correct interface ).
Observables seem easy enough to implement
class Observable {
constructor(subscribe) {
this._subscribe = subscribe;
}
subscribe(next, complete, error) {
const observer = new Observer(next, complete, error);
// return way to unsubscribe
return this._subscribe(observer);
}
}
Where Observer
is just a wrapper that adds some try catches and monitors isComplete so it can clean up and stop observing.
For a Subject I came up with:
class Subject {
subscribers = new Set();
constructor() {
this.observable = new Observable(observer => {
this.observer = observer;
});
this.observable.subscribe((...args) => {
this.subscribers.forEach(sub => sub(...args))
});
}
subscribe(subscriber) {
this.subscribers.add(subscriber);
}
emit(...args) {
this.observer.next(...args);
}
}
which sort of merges into an EventEmitter with it wrapping .next with emit - but capturing the observe
argument of the Observable seems wrong - and like I have just hacked up a solution. What would be the better way to produce a Subject (multicast) from an Observable (unicast)?
I tried looking at RXJS but I can't see how it's subscribers
array ever gets populated :/
I think you can have a better understanding by using the debugger as well. Open a StackBlitz RxJS project, create the simplest example(depending on what you're trying to understand) and then place some breakpoints. AFAIK, with StackBlitz you can debug the TypeScript files, which seems great.
Firstly, the Subject
class extends Observable
:
export class Subject<T> extends Observable<T> implements SubscriptionLike { /* ... */ }
Now let's examine the Observable
class.
It has the well-known pipe
method:
pipe(...operations: OperatorFunction<any, any>[]): Observable<any> {
return operations.length ? pipeFromArray(operations)(this) : this;
}
where pipeFromArray
is defined as follows:
export function pipeFromArray<T, R>(fns: Array<UnaryFunction<T, R>>): UnaryFunction<T, R> {
if (fns.length === 0) {
return identity as UnaryFunction<any, any>;
}
if (fns.length === 1) {
return fns[0];
}
return function piped(input: T): R {
return fns.reduce((prev: any, fn: UnaryFunction<T, R>) => fn(prev), input as any);
};
}
Before clarifying what's going on in the above snippet, it is important to know that operators are. An operator is a function which returns another function whose single argument is an Observable<T>
and whose return type is an Observable<R>
. Sometimes, T
and R
can be the same(e.g when using filter()
, debounceTime()
...).
For example, map
is defined like this:
export function map<T, R>(project: (value: T, index: number) => R, thisArg?: any): OperatorFunction<T, R> {
return operate((source, subscriber) => {
// The index of the value from the source. Used with projection.
let index = 0;
// Subscribe to the source, all errors and completions are sent along
// to the consumer.
source.subscribe(
new OperatorSubscriber(subscriber, (value: T) => {
// Call the projection function with the appropriate this context,
// and send the resulting value to the consumer.
subscriber.next(project.call(thisArg, value, index++));
})
);
});
}
export function operate<T, R>(
init: (liftedSource: Observable<T>, subscriber: Subscriber<R>) => (() => void) | void
): OperatorFunction<T, R> {
return (source: Observable<T>) => {
if (hasLift(source)) {
return source.lift(function (this: Subscriber<R>, liftedSource: Observable<T>) {
try {
return init(liftedSource, this);
} catch (err) {
this.error(err);
}
});
}
throw new TypeError('Unable to lift unknown Observable type');
};
}
So, operate
will return a function. Notice its argument: source: Observable<T>
. The return type is derived from Subscriber<R>
.
Observable.lift
just creates a new Observable
. It's like creating nodes within a liked list.
protected lift<R>(operator?: Operator<T, R>): Observable<R> {
const observable = new Observable<R>();
// it's important to keep track of the source !
observable.source = this;
observable.operator = operator;
return observable;
}
So, an operator(like map
) will return a function. What invokes that function is the pipeFromArray
function:
export function pipeFromArray<T, R>(fns: Array<UnaryFunction<T, R>>): UnaryFunction<T, R> {
if (fns.length === 0) {
return identity as UnaryFunction<any, any>;
}
if (fns.length === 1) {
return fns[0];
}
return function piped(input: T): R {
// here the functions returned by the operators are being called
return fns.reduce((prev: any, fn: UnaryFunction<T, R>) => fn(prev), input as any);
};
}
In the above snippet, fn
is what the operate
function returns:
return (source: Observable<T>) => {
if (hasLift(source)) { // has `lift` method
return source.lift(function (this: Subscriber<R>, liftedSource: Observable<T>) {
try {
return init(liftedSource, this);
} catch (err) {
this.error(err);
}
});
}
throw new TypeError('Unable to lift unknown Observable type');
};
Maybe it would be better to see an example as well. I'd recommend trying this yourself with a debugger.
const src$ = new Observable(subscriber => {subscriber.next(1), subscriber.complete()});
The subscriber => {}
callback fn will be assigned to the Observable._subscribe
property.
constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
if (subscribe) {
this._subscribe = subscribe;
}
}
Next, let's try adding an operator:
const src2$ = src$.pipe(map(num => num ** 2))
In this case, it will invoke this block from pipeFromArray
:
// `pipeFromArray`
if (fns.length === 1) {
return fns[0];
}
// `Observable.pipe`
pipe(...operations: OperatorFunction<any, any>[]): Observable<any> {
return operations.length ? pipeFromArray(operations)(this) : this;
}
So, the Observable.pipe
will invoke (source: Observable<T>) => { ... }
, where source
is the src$
Observable
. By invoking that function(whose result is stored in src2$
), it will also call the Observable.lift
method.
return source.lift(function (this: Subscriber<R>, liftedSource: Observable<T>) {
try {
return init(liftedSource, this);
} catch (err) {
this.error(err);
}
});
/* ... */
protected lift<R>(operator?: Operator<T, R>): Observable<R> {
const observable = new Observable<R>();
observable.source = this;
observable.operator = operator;
return observable;
}
At this point, src$
is an Observable
instance, which has the source
set to src$
and the operator
set to function (this: Subscriber<R>, liftedSource: Observable<T>) ...
.
From my perspective, it's all about linked lists. When creating the Observable
chain(by adding operators), the list is created from top to bottom.
When the tail node has its subscribe
method called, another list will be created, this time from bottom to top. I like to call the first one the Observable list
and the second one the Subscribers list
.
src2$.subscribe(console.log)
This is what happens when the subscribe
method is called:
const subscriber = isSubscriber(observerOrNext) ? observerOrNext : new SafeSubscriber(observerOrNext, error, complete);
const { operator, source } = this;
subscriber.add(
operator
? operator.call(subscriber, source)
: source || config.useDeprecatedSynchronousErrorHandling
? this._subscribe(subscriber)
: this._trySubscribe(subscriber)
);
return subscriber;
In this case src2$
has an operator
, so it will call that. operator
is defined as:
function (this: Subscriber<R>, liftedSource: Observable<T>) {
try {
return init(liftedSource, this);
} catch (err) {
this.error(err);
}
}
where init
depends on the operator that is used. Once again, here is map
's init
export function map<T, R>(project: (value: T, index: number) => R, thisArg?: any): OperatorFunction<T, R> {
return operate( /* THIS IS `init()` */(source, subscriber) => {
// The index of the value from the source. Used with projection.
let index = 0;
// Subscribe to the source, all errors and completions are sent along
// to the consumer.
source.subscribe(
new OperatorSubscriber(subscriber, (value: T) => {
// Call the projection function with the appropriate this context,
// and send the resulting value to the consumer.
subscriber.next(project.call(thisArg, value, index++));
})
);
});
}
source
is in fact src$
. When source.subscribe()
is called, it will end up calling the callback provided to new Observable(subscriber => { ... })
. Calling subscriber.next(1)
will call the (value: T) => { ... }
from above, which will call subscriber.next(project.call(thisArg, value, index++));
(project
- the callback provided to map
). Lastly, subscriber.next
refers to console.log
.
Coming back to Subject
, this is what happens when the _subscribe
method is called:
protected _subscribe(subscriber: Subscriber<T>): Subscription {
this._throwIfClosed(); // if unsubscribed
this._checkFinalizedStatuses(subscriber); // `error` or `complete` notifications
return this._innerSubscribe(subscriber);
}
protected _innerSubscribe(subscriber: Subscriber<any>) {
const { hasError, isStopped, observers } = this;
return hasError || isStopped
? EMPTY_SUBSCRIPTION
: (observers.push(subscriber), new Subscription(() => arrRemove(this.observers, subscriber)));
}
So, this is how Subject's
list of subscribers are is populated. By returning new Subscription(() => arrRemove(this.observers, subscriber))
, it ensures that then subscriber unsubscribes(due to complete
/error
notifications or simply subscriber.unsubscribe()
), the inactive subscriber will be removed from the Subject
's list.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With