Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is pipe for in RxJS?

People also ask

What is pipe () function in angular?

Pipes let you combine multiple functions into a single function. The pipe() function takes as its arguments the functions you want to combine, and returns a new function that, when executed, runs the composed functions in sequence.

Does pipe create a new observable?

A Pipeable Operator is essentially a pure function which takes one Observable as input and generates another Observable as output.

Why we use pipe with subscribe in angular?

The pipe method is for chaining observable operators, and the subscribe is for activating the observable and listening for emitted values. The pipe method was added to allow webpack to drop unused operators from the final JavaScript bundle. It makes it easier to build smaller files.

What is pipe tap RxJS?

RxJS tap() operator is a utility operator that returns an observable output that is identical to the source observable but performs a side effect for every emission on the source observable.


The "pipeable" (former "lettable") operators is the current and recommended way of using operators since RxJS 5.5.

I strongly recommend you to read the official documentation on pipeable operators

The main difference is that it's easier to make custom operators and that it's better treeshakable while not altering some global Observable object that could possible make collisions if two different parties wanted to create an operator of the same name.

Using separate import statement for each operator 'rxjs/add/operator/first' was a way to make smaller app bundles. By importing only operators you need instead of the entire RxJS library you can significantly reduce the total bundle size. However the compiler can't know if you imported 'rxjs/add/operator/first' because you really need it in you code or you just forgot to remove it when refactoring your code. That's one of the advantages of using pipeable operators where unused imports are ignored automatically.


The pipe method

According to original Documentation

the pipable operator is that function take observables as a input and it returns another observable .previous observable stays unmodified.

pipe(...fns: UnaryFunction<any, any>[]): UnaryFunction<any, any>

Original Post

What pipe mean?

That means that any operators you previously used on the instance of observable are available as pure functions under rxjs/operators. This makes building a composition of operators or re-using operators becomes really easy, without having to resort to all sorts of programming gymnastics where you have to create a custom observable extending Observable, then overwrite lift just to make your own custom thing.

const { Observable } = require('rxjs/Rx')
const { filter, map, reduce,  } = require('rxjs/operators')
const { pipe } = require('rxjs/Rx')

const filterOutWithEvens = filter(x => x % 2)
const doubleByValue = x => map(value => value * x);
const sumValue = reduce((acc, next) => acc + next, 0);
const source$ = Observable.range(0, 10)

source$.pipe(
  filterOutWithEvens, 
  doubleByValue(2), 
  sumValue)
  .subscribe(console.log); // 50

What is the difference? As you see in your example, the main difference is to improve the readability of the source code. There are only two functions in your example, but imagine if there are a dozen of the functions? then it will go like

function1().function2().function3().function4()

it's really getting ugly, and difficult to read, especially when you are filling inside of the functions. On top of that certain editors like Visual Studio code doesn't allow more than 140 line length. but if it goes like following.

Observable.pipe(
function1(),
function2(),
function3(),
function4()
)

This drastically improves the readability.

If there is no difference, why the function pipe exists? The purpose of the PIPE() function is to lump together all the functions that take, and return observable. It takes an observable initially, then that observable is used throughout the pipe() function by each function used inside of it.

First function takes the observable, processes it, modify its value, and passes to the next function, then next function takes the output observable of the first function, processes it, and passes to the next function, then it goes on until all the functions inside of pipe() function use that observable, finally you have the processed observable. At the end you can execute the observable with subscribe() function to extract the value out of it. Remember, the values in the original observable are not changed.!! 

Why those functions need different imports? Imports depend on in where the function is specified in the rxjs package. It goes like this. All the modules are stored in node_modules folder in Angular. import { class } from "module";

Let's take the following code as an example. I have just wrote it in stackblitz. So nothing is automatically generated, or copied from somewhere else. I don't see the point of copying what stated in rxjs documentation when you can go and read it too. I assume you asked this question here, because you didn't understand the documentation. 

  • There are pipe, observable, of, map classes imported from the respective modules. 
  • In body of the class, I used the Pipe() function as seen in the code. 
  • The Of() function returns an observable, that emits numbers in sequence when it's subscribed.

  • Observable isn't subscribed yet.

  • When you used it likes Observable.pipe(), the pipe() function uses the given Observable as an input.

  • The first function, map() function uses that Observable, process it, return the processed Observable back to the pipe() function,

  • then that processed Observable is given to the next function if there is any,

  • and it goes on like that until all the functions process the Observable,

  • at the end that Observable is returned by the pipe() function to a variable, in the following example its obs.

Now the thing in Observable is, As long as the observer didn't subscribe it, it doesn't emit any value. So I used the subscribe() function to subscribe to this Observable, then as soon as I subscribed it. The of() function starts emitting values, then they are processed through pipe() function, and you get the final result at the end, for instance 1 is taken from of() function, 1 is added 1 in map() function, and returned back. You can get that value as an argument inside of the subscribe( function (argument) {} ) function.

If you want to print it, then uses as

subscribe( function (argument) {
    console.log(argument)
   } 
)
    import { Component, OnInit } from '@angular/core';
    import { pipe } from 'rxjs';
    import { Observable, of } from 'rxjs';
    import { map } from 'rxjs/operators';
    
    @Component({
      selector: 'my-app',
      templateUrl: './app.component.html',
      styleUrls: [ './app.component.css' ]
    })
    export class AppComponent implements OnInit  {
    
      obs = of(1,2,3).pipe(
      map(x => x + 1),
      ); 
    
      constructor() { }
    
      ngOnInit(){  
        this.obs.subscribe(value => console.log(value))
      }
    }

https://stackblitz.com/edit/angular-ivy-plifkg


A good summary I've come up with is:

It decouples the streaming operations (map, filter, reduce...) from the core functionality(subscribing, piping). By piping operations instead of chaining, it doesn't pollute the prototype of Observable making it easier to do tree shaking.

See https://github.com/ReactiveX/rxjs/blob/master/doc/pipeable-operators.md#why

Problems with the patched operators for dot-chaining are:

Any library that imports a patch operator will augment the Observable.prototype for all consumers of that library, creating blind dependencies. If the library removes their usage, they unknowingly break everyone else. With pipeables, you have to import the operators you need into each file you use them in.

Operators patched directly onto the prototype are not "tree-shakeable" by tools like rollup or webpack. Pipeable operators will be as they are just functions pulled in from modules directly.

Unused operators that are being imported in apps cannot be detected reliably by any sort of build tooling or lint rule. That means that you might import scan, but stop using it, and it's still being added to your output bundle. With pipeable operators, if you're not using it, a lint rule can pick it up for you.

Functional composition is awesome. Building your own custom operators becomes much, much easier, and now they work and look just like all other operators from rxjs. You don't need to extend Observable or override lift anymore.