Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Chaining Observables in RxJS

I'm learning RxJS and Angular 2. Let's say I have a promise chain with multiple async function calls which depend on the previous one's result which looks like:

var promiseChain = new Promise((resolve, reject) => {   setTimeout(() => {     resolve(1);   }, 1000); }).then((result) => {   console.log(result);    return new Promise((resolve, reject) => {     setTimeout(() => {       resolve(result + 2);     }, 1000);   }); }).then((result) => {   console.log(result);    return new Promise((resolve, reject) => {       setTimeout(() => {       resolve(result + 3);         }, 1000);   }); });  promiseChain.then((finalResult) => {   console.log(finalResult); }); 

My attempts at doing the same solely using RxJS without the use of promises produced the following:

var observableChain = Observable.create((observer) => {   setTimeout(() => {     observer.next(1);     observer.complete();   }, 1000); }).flatMap((result) => {   console.log(result);    return Observable.create((observer) => {     setTimeout(() => {       observer.next(result + 2);       observer.complete()     }, 1000);   }); }).flatMap((result) => {   console.log(result);    return Observable.create((observer) => {     setTimeout(() => {       observer.next(result + 3);       observer.complete()     }, 1000);   }); });  observableChain.subscribe((finalResult) => {   console.log(finalResult); }); 

It yields the same output as the promise chain. My questions are

  1. Am I doing this right? Are there any RxJS related improvements that I can make to the above code

  2. How do I get this observable chain to execute repeatedly? i.e. Adding another subscription at the end just produces an additional 6 though I expect it to print 1, 3 and 6.

    observableChain.subscribe((finalResult) => { console.log(finalResult); });

    observableChain.subscribe((finalResult) => { console.log(finalResult); });

    1 3 6 6

like image 252
Harindaka Avatar asked Jun 12 '16 07:06

Harindaka


People also ask

How do I combine two Observables RxJS?

The RxJS merge() operator is a join operator that is used to turn multiple observables into a single observable. It creates an output Observable, which concurrently emits all values from every given input Observables.

What is FlatMap in RxJS?

The FlatMap operator transforms an Observable by applying a function that you specify to each item emitted by the source Observable, where that function returns an Observable that itself emits items. FlatMap then merges the emissions of these resulting Observables, emitting these merged results as its own sequence.


1 Answers

About promise composition vs. Rxjs, as this is a frequently asked question, you can refer to a number of previously asked questions on SO, among which :

  • How to do the chain sequence in rxjs
  • RxJS Promise Composition (passing data)
  • RxJS sequence equvalent to promise.then()?

Basically, flatMap is the equivalent of Promise.then.

For your second question, do you want to replay values already emitted, or do you want to process new values as they arrive? In the first case, check the publishReplay operator. In the second case, standard subscription is enough. However you might need to be aware of the cold. vs. hot dichotomy depending on your source (cf. Hot and Cold observables : are there 'hot' and 'cold' operators? for an illustrated explanation of the concept)

like image 183
user3743222 Avatar answered Sep 28 '22 03:09

user3743222