In RxJava, there is Observable.OnSubscribe which will call your action when something subscribes to the observable.
Is there an equivalent in RxJs?
It seems as though they've taken the page down since asking this question. Google's cached version lives here.
An Observable is basically a function that can return a stream of values to an observer over time, this can either be synchronously or asynchronously. The data values returned can go from zero to an infinite range of values.
A Pipeable Operator is a function that takes an Observable as its input and returns another Observable. It is a pure operation: the previous Observable stays unmodified. A Pipeable Operator is essentially a pure function which takes one Observable as input and generates another Observable as output.
From my understanding of Angular and RxJs there are two ways to terminate Observables. You can unsubscribe() from them or use takeUntil() and complete() .
Really like Mark's answer, but if you're using rxjs
version 5 or later, I'd prefer using pure functions instead of patching the prototype.
Implemented without empty
concat
hack using defer
as suggested by Ray
import {defer} from 'rxjs/observable/defer';
import {Observable} from 'rxjs/Observable';
/** Example
import {from} from 'rxjs/observable/from';
from([1, 2, 3])
.pipe(doOnSubscribe(() => console.log('subscribed to stream')))
.subscribe(x => console.log(x), null, () => console.log('completed'));
*/
export function doOnSubscribe<T>(onSubscribe: () => void): (source: Observable<T>) => Observable<T> {
return function inner(source: Observable<T>): Observable<T> {
return defer(() => {
onSubscribe();
return source;
});
};
}
https://gist.github.com/evxn/750702f7c8e8d5a32c7b53167fe14d8d
import {empty} from 'rxjs/observable/empty';
import {concat, tap} from 'rxjs/operators';
import {Observable} from 'rxjs/Observable';
/** Example
import {from} from 'rxjs/observable/from';
from([1, 2, 3])
.pipe(doOnSubscribe(() => console.log('subscribed to stream')))
.subscribe(x => console.log(x), null, () => console.log('completed'));
*/
export function doOnSubscribe<T>(callback: () => void): (source: Observable<T>) => Observable<T> {
return function inner(source: Observable<T>): Observable<T> {
return empty().pipe(
tap(null, null, callback),
concat<T>(source)
);
};
}
This function is easily implemented by composition of the existing functions of RxJs5.
Rx.Observable.prototype.doOnSubscribe = function (onSubscribe) {
return Rx.Observable.empty()
.do(null,null, onSubscribe)
.concat(this);
};
Rx.Observable.from([1,2,3])
.doOnSubscribe(() => console.log('subscribed to stream'))
.subscribe(console.log, null, () => console.log('completed'))
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>
When downstream subscribes to the doOnSubscribe
we first emit an empty observable and use the .do()
onComplete callback as doOnSubscribe
. Then we .concat()
the upstream to this and return this modified stream.
Another example that builds on Mark's answer is to use the factory functions:
Rx.Observable.prototype.doOnSubscribe = function(onSubscribe) {
const source = this;
return Rx.Observable.defer(() => {
onSubscribe();
return source;
});
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With