Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Combine RxJS operators into new operator using TypeScript

I frequently find my self adding the same sequence of operators to observables, e.g.

observable$
  .do(x => console.log('some text', x))
  .publishReplay()
  .refCount();

I'm looking for a way to combine these 3 operators in a small reusable operator (e.g. .cache('some text')) that I can chain to any observable. How can I define this in Typescript, so that I could import rxjs/Observable and this operator, like I do with rxjs operators?

like image 350
Peter Albert Avatar asked Feb 03 '17 22:02

Peter Albert


People also ask

How do I combine multiple observables into one?

We can use the concat operator to take multiple Observables and return a new Observable that sequentially emits values from each Observable that were passed in. It works by subscribing to them one at a time and merging the results in the output Observable.

What is merge operator in RxJS?

The RxJS merge() operator is a join operator that is used to turn multiple observables into a single observable. It creates an output Observable, which concurrently emits all values from every given input Observables.

Which method is used in RxJS to work on multiple operators together in sequential order?

ConcatAll This operator combines all emitted inner streams and just as with plain concat sequentially produces values from each stream.

What is pipe () in RxJS?

log(x)); // Logs // 1 // 4 // 9. You can use pipes to link operators together. Pipes let you combine multiple functions into a single function. The pipe() function takes as its arguments the functions you want to combine, and returns a new function that, when executed, runs the composed functions in sequence.


2 Answers

To implement the operator you have described, create a cache.ts file with the following content:

import { Observable } from "rxjs/Observable";
import "rxjs/add/operator/do";
import "rxjs/add/operator/publishReplay";

// Compose the operator:

function cache<T>(this: Observable<T>, text: string): Observable<T> {
  return this
    .do(x => console.log(text, x))
    .publishReplay()
    .refCount();
}

// Add the operator to the Observable prototype:

Observable.prototype.cache = cache;

// Extend the TypeScript interface for Observable to include the operator:

declare module "rxjs/Observable" {
  interface Observable<T> {
    cache: typeof cache;
  }
}

And consume it like this:

import { Observable } from "rxjs/Observable";
import "rxjs/add/observable/of";
import "./cache";

let cached = Observable.of(1).cache("some text");
cached.subscribe(x => console.log(x));
like image 141
cartant Avatar answered Oct 01 '22 19:10

cartant


cartant's answer above works well, and answers the question that was asked (How can I define this in Typescript, so that I could import rxjs/Observable and this operator, like I do with rxjs operators?)

I recently discovered the let operator which if you don't actually need to have the function implemented as an operator, will still let you DRY up your code.

I was starting on implementing an angular 2 service to interface with my rails backend and knew that most of my api calls would look very similar so I wanted to try and put as much of the common stuff in a function.

Almost all the calls will do the following:

  1. retry on an error (my function below needs more work on that front)
  2. map the http response into a typescript locally defined class (via json-typescript-mapper)
  3. handle errors

Here is an example of my use the let operator to my http responses through a common function (handleResponse) via the rxjs let operator.

  handleResponse<T>({klass, retries=0} :{klass:any,retries?:number }) : (source: Observable<Response>) => Observable<T> {
    return (source: Observable<Response>)  : Observable<T> => {
      return source.retry(retries)
            .map( (res) => this.processResponse(klass,res))
            .catch( (res) => this.handleError(res));
    } 
  } 

  processResponse(klass, response: Response) {
    return deserialize(klass, response.json());
  }

  handleError(res: Response) {
    const error = new RailsBackendError(res.status, res.statusText);
    return  Observable.throw(error);
  }

  getUserList({page=1,perPage=30,retry=0}: { page?:number, perPage?:number, retry?:number }={}) : Observable<UserList> {
    const requestURL = `/api/v1/users/?${this.apiTokenQueryString}&page=${page}&per_page=${perPage}`;
    return this.http.get(requestURL).let(this.handleResponse<UserList>({klass: UserList}));
  }
like image 24
nPn Avatar answered Oct 01 '22 17:10

nPn