I frequently find my self adding the same sequence of operators to observables, e.g.
observable$
.do(x => console.log('some text', x))
.publishReplay()
.refCount();
I'm looking for a way to combine these 3 operators in a small reusable operator (e.g. .cache('some text')
) that I can chain to any observable. How can I define this in Typescript, so that I could import rxjs/Observable and this operator, like I do with rxjs operators?
We can use the concat operator to take multiple Observables and return a new Observable that sequentially emits values from each Observable that were passed in. It works by subscribing to them one at a time and merging the results in the output Observable.
The RxJS merge() operator is a join operator that is used to turn multiple observables into a single observable. It creates an output Observable, which concurrently emits all values from every given input Observables.
ConcatAll This operator combines all emitted inner streams and just as with plain concat sequentially produces values from each stream.
log(x)); // Logs // 1 // 4 // 9. You can use pipes to link operators together. Pipes let you combine multiple functions into a single function. The pipe() function takes as its arguments the functions you want to combine, and returns a new function that, when executed, runs the composed functions in sequence.
To implement the operator you have described, create a cache.ts
file with the following content:
import { Observable } from "rxjs/Observable";
import "rxjs/add/operator/do";
import "rxjs/add/operator/publishReplay";
// Compose the operator:
function cache<T>(this: Observable<T>, text: string): Observable<T> {
return this
.do(x => console.log(text, x))
.publishReplay()
.refCount();
}
// Add the operator to the Observable prototype:
Observable.prototype.cache = cache;
// Extend the TypeScript interface for Observable to include the operator:
declare module "rxjs/Observable" {
interface Observable<T> {
cache: typeof cache;
}
}
And consume it like this:
import { Observable } from "rxjs/Observable";
import "rxjs/add/observable/of";
import "./cache";
let cached = Observable.of(1).cache("some text");
cached.subscribe(x => console.log(x));
cartant's answer above works well, and answers the question that was asked (How can I define this in Typescript, so that I could import rxjs/Observable and this operator, like I do with rxjs operators?)
I recently discovered the let
operator which if you don't actually need to have the function implemented as an operator, will still let you DRY up your code.
I was starting on implementing an angular 2 service to interface with my rails backend and knew that most of my api calls would look very similar so I wanted to try and put as much of the common stuff in a function.
Almost all the calls will do the following:
Here is an example of my use the let
operator to my http responses through a common function (handleResponse) via the rxjs let operator.
handleResponse<T>({klass, retries=0} :{klass:any,retries?:number }) : (source: Observable<Response>) => Observable<T> {
return (source: Observable<Response>) : Observable<T> => {
return source.retry(retries)
.map( (res) => this.processResponse(klass,res))
.catch( (res) => this.handleError(res));
}
}
processResponse(klass, response: Response) {
return deserialize(klass, response.json());
}
handleError(res: Response) {
const error = new RailsBackendError(res.status, res.statusText);
return Observable.throw(error);
}
getUserList({page=1,perPage=30,retry=0}: { page?:number, perPage?:number, retry?:number }={}) : Observable<UserList> {
const requestURL = `/api/v1/users/?${this.apiTokenQueryString}&page=${page}&per_page=${perPage}`;
return this.http.get(requestURL).let(this.handleResponse<UserList>({klass: UserList}));
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With