Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Observable.onSubscribe equivalent in RxJs

Tags:

rxjs

In RxJava, there is Observable.OnSubscribe which will call your action when something subscribes to the observable.

Is there an equivalent in RxJs?


It seems as though they've taken the page down since asking this question. Google's cached version lives here.

like image 943
silencedmessage Avatar asked Jan 26 '17 21:01

silencedmessage


People also ask

What is an Observable in RxJS?

An Observable is basically a function that can return a stream of values to an observer over time, this can either be synchronously or asynchronously. The data values returned can go from zero to an infinite range of values.

What is pipe operator in RxJS?

A Pipeable Operator is a function that takes an Observable as its input and returns another Observable. It is a pure operation: the previous Observable stays unmodified. A Pipeable Operator is essentially a pure function which takes one Observable as input and generates another Observable as output.

How do you exit Observable?

From my understanding of Angular and RxJs there are two ways to terminate Observables. You can unsubscribe() from them or use takeUntil() and complete() .


3 Answers

Really like Mark's answer, but if you're using rxjs version 5 or later, I'd prefer using pure functions instead of patching the prototype.

Update:

Implemented without empty concat hack using defer as suggested by Ray

import {defer} from 'rxjs/observable/defer';
import {Observable} from 'rxjs/Observable';

/** Example
import {from} from 'rxjs/observable/from';

from([1, 2, 3])
    .pipe(doOnSubscribe(() => console.log('subscribed to stream')))
    .subscribe(x => console.log(x), null, () => console.log('completed'));
*/

export function doOnSubscribe<T>(onSubscribe: () => void): (source: Observable<T>) =>  Observable<T> {
    return function inner(source: Observable<T>): Observable<T> {
        return defer(() => {
          onSubscribe();
          return source;
        });
    };
}

https://gist.github.com/evxn/750702f7c8e8d5a32c7b53167fe14d8d

Original Answer:

import {empty} from 'rxjs/observable/empty';
import {concat, tap} from 'rxjs/operators';
import {Observable} from 'rxjs/Observable';

/** Example
import {from} from 'rxjs/observable/from';
from([1, 2, 3])
    .pipe(doOnSubscribe(() => console.log('subscribed to stream')))
    .subscribe(x => console.log(x), null, () => console.log('completed'));
*/

export function doOnSubscribe<T>(callback: () => void): (source: Observable<T>) =>  Observable<T> {
    return function inner(source: Observable<T>): Observable<T> {
        return empty().pipe(
            tap(null, null, callback),
            concat<T>(source)
        );
    };
}
like image 154
Evgeniy Malyutin Avatar answered Sep 18 '22 17:09

Evgeniy Malyutin


This function is easily implemented by composition of the existing functions of RxJs5.

Rx.Observable.prototype.doOnSubscribe = function (onSubscribe) {
  return Rx.Observable.empty()
    .do(null,null, onSubscribe)
    .concat(this);
};

Rx.Observable.from([1,2,3])
  .doOnSubscribe(() => console.log('subscribed to stream'))
  .subscribe(console.log, null, () => console.log('completed'))
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>

When downstream subscribes to the doOnSubscribe we first emit an empty observable and use the .do() onComplete callback as doOnSubscribe. Then we .concat() the upstream to this and return this modified stream.

like image 37
Mark van Straten Avatar answered Sep 21 '22 17:09

Mark van Straten


Another example that builds on Mark's answer is to use the factory functions:

Rx.Observable.prototype.doOnSubscribe = function(onSubscribe) {
  const source = this; 
  return Rx.Observable.defer(() => {
      onSubscribe();
      return source;
  });
}
like image 34
Ray Booysen Avatar answered Sep 18 '22 17:09

Ray Booysen