Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What are use cases for mergeMap operator?

I don't understand the purpose of mergeMap at all. I have heard are two explanations:

  1. It's like SelectAll() in .NET LINQ - nope.
  2. It's a combination of RxJS merge and map - nope (or I can't replicate this).

Consider the following code:

    var obs1 = new Rx.Observable.interval(1000);     var obs2 = new Rx.Observable.interval(1000);          //Just a merge and a map, works fine     obs1.merge(obs2).map(x=> x+'a').subscribe(       next => console.log(next)     )          //Who know what - seems to do the same thing as a plain map on 1 observable     obs1.mergeMap(val => Rx.Observable.of(val + `B`))         .subscribe(           next => console.log(next)         ) 

The last piece labelled "Who knows what" does nothing more than a map on obs1 - what's the point?

What does mergeMap actually do? What is an example of a valid use case? (Preferably with some code)

Articles that didn't help me at all (mergeMap code from above is from one of these): 1, 2

like image 537
VSO Avatar asked Feb 08 '17 18:02

VSO


People also ask

What is mergeMap used for?

The mergeMap() operator is also called flatMap. This operator is best to use when you want to flatten an inner observable and manually control the number of inner subscriptions. The RxJS mergeMap() operator maintains multiple active inner subscriptions at once.

When should we use the switchMap mergeMap and concatMap in RxJS?

Use mergeMap if you simply want to flatten the data into one Observable, use switchMap if you need to flatten the data into one Observable but only need the latest value and use concatMap if you need to flatten the data into one Observable and the order is important to you.

When would you use a switchMap?

switchMap could be used — when a new search is made, pending results are no longer needed; and. exhaustMap should not be used — searches for new, partial addresses should not be ignored.

What is the difference between switchMap and mergeMap?

So here's the simple difference — switchMap cancels previous HTTP requests that are still in progress, while mergeMap lets all of them finish. In my case, I needed all requests to go through, as this is a metrics service that's supposed to log all actions that the user performs on the web page, so I used mergeMap .


1 Answers

tl;dr; mergeMap is way more powerful than map. Understanding mergeMap is the necessary condition to access full power of Rx.


similarities

  • both mergeMap and map acts on a single stream (vs. zip, combineLatest)

  • both mergeMap and map can transform elements of a stream (vs. filter, delay)

differences

map

  • cannot change size of the source stream (assumption: map itself does not throw); for each element from source exactly one mapped element is emitted; map cannot ignore elements (like for example filter);

  • in case of the default scheduler the transformation happens synchronously; to be 100% clear: the source stream may deliver its elements asynchronously, but each next element is immediately mapped and re-emitted further; map cannot shift elements in time like for example delay

  • no restrictions on return values

  • id: x => x

mergeMap

  • can change size of the source stream; for each element there might be arbitrary number (0, 1 or many) of new elements created/emitted

  • it offers full control over asynchronicity - both when new elements are created/emitted and how many elements from the source stream should be processed concurrently; for example assume source stream emitted 10 elements but maxConcurrency is set to 2 then two first elements will be processed immediately and the rest 8 buffered; once one of the processed completed the next element from source stream will be processed and so on - it is bit tricky, but take a look at the example below

  • all other operators can be implemented with just mergeMap and Observable constructor

  • may be used for recursive async operations

  • return values has to be of Observable type (or Rx has to know how to create observable out of it - e.g. promise, array)

  • id: x => Rx.Observable.of(x)

array analogy

let array = [1,2,3] fn             map                    mergeMap x => x*x       [1,4,9]                error /*expects array as return value*/ x => [x,x*x]   [[1,1],[2,4],[3,9]]    [1,1,2,4,3,9] 

The analogy does not show full picture and it basically corresponds to .mergeMap with maxConcurrency set to 1. In such a case elements will be ordered as above, but in general case it does not have to be so. The only guarantee we have is that emission of new elements will be order by their position in the underlying stream. For example: [3,1,2,4,9,1] and [2,3,1,1,9,4] are valid, but [1,1,4,2,3,9] is not (since 4 was emitted after 2 in the underlying stream).

A couple of examples using mergeMap:

// implement .map with .mergeMap Rx.Observable.prototype.mapWithMergeMap = function(mapFn) {   return this.mergeMap(x => Rx.Observable.of(mapFn(x))); }  Rx.Observable.range(1, 3)   .mapWithMergeMap(x => x * x)   .subscribe(x => console.log('mapWithMergeMap', x))  // implement .filter with .mergeMap Rx.Observable.prototype.filterWithMergeMap = function(filterFn) {   return this.mergeMap(x =>     filterFn(x) ?     Rx.Observable.of(x) :     Rx.Observable.empty()); // return no element }  Rx.Observable.range(1, 3)   .filterWithMergeMap(x => x === 3)   .subscribe(x => console.log('filterWithMergeMap', x))  // implement .delay with .mergeMap  Rx.Observable.prototype.delayWithMergeMap = function(delayMs) {   return this.mergeMap(x =>     Rx.Observable.create(obs => {       // setTimeout is naive - one should use scheduler instead       const token = setTimeout(() => {         obs.next(x);         obs.complete();       }, delayMs)       return () => clearTimeout(token);     })) }  Rx.Observable.range(1, 3)   .delayWithMergeMap(500)   .take(2)   .subscribe(x => console.log('delayWithMergeMap', x))  // recursive count const count = (from, to, interval) => {   if (from > to) return Rx.Observable.empty();   return Rx.Observable.timer(interval)     .mergeMap(() =>       count(from + 1, to, interval)       .startWith(from)) }  count(1, 3, 1000).subscribe(x => console.log('count', x))  // just an example of bit different implementation with no returns const countMoreRxWay = (from, to, interval) =>   Rx.Observable.if(     () => from > to,     Rx.Observable.empty(),     Rx.Observable.timer(interval)     .mergeMap(() => countMoreRxWay(from + 1, to, interval)       .startWith(from)))  const maxConcurrencyExample = () =>   Rx.Observable.range(1,7)     .do(x => console.log('emitted', x))     .mergeMap(x => Rx.Observable.timer(1000).mapTo(x), 2)     .do(x => console.log('processed', x))     .subscribe()  setTimeout(maxConcurrencyExample, 3100)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.1/Rx.min.js"></script>
like image 170
artur grzesiak Avatar answered Sep 19 '22 00:09

artur grzesiak