Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Merging unknown number of observables with RxJS

I am currently trying to create a small project to demo Reactive Programming with RxJS. The goal is to show my colleagues that this thing is out there, and it is worth looking into. I'm not experienced with the framework, so that makes things complicated.

I am trying to expand another demo of mine to utilize RxJS. It is not a very complicated demo, basically I can add any number of small forms, which result in a number calculated by a small formula, and there is a button, which sums up the values of all the forms.

Getting the formula calculate inside the forms is easy, but I figured I could go further.

I want to have the sum operation done automatically through a merged observable. The only solution I figured out is something along these lines:

//dummy observables
var s1 = Rx.Observable.Interval(100);
var s2 = Rx.Observable.Interval(200);

//Combine the observables
var m = s1.combineLatest(s2, function(x,y){return x+y});

//Subscribe to the combined observable
var sub = m.subscribe(function(x){console.log(x)});
//A new observable is created
var s3 = Rx.Observable.Interval(300);
//Now I need to update all my subscriptions, wich is a pain.
m = m.combine(s3, function(x,y){return x+y});
sub.dispose();
sub=m.subscribe(function(x){console.log(x)});

I figured I could get another observable to notify my subscriptions to update themselves - as knowing how all my subscribers work would render the whole architecture useless, but this sounds like an overkill for a task like this, and I don't just mean the demo, I can't really imagine to have an "every day" real world example where an architecture like this would make things cleaner than just watching for any change, and getting the calculated values "actively" from my forms.

I'd probably do the active getting, and summing of values inside the module handling the forms, and have the "m" observable for the outside world to subscribe to, pushing my values into it from inside the module.

Would this be a correct approach? I'd think yes, because they are owned by my module, I'm supposed to have full control over what is happening to them, but I'm really interested in what more experienced people think about this.

like image 886
Robert Avatar asked Oct 19 '22 04:10

Robert


2 Answers

I know I'm late to this party, but I just had this same exact question, and could not find any suitable answer. I figured out how to do this in the context of my application, so unless this is still an open issue for you, I'll post the answer here for posterity.

To set up my problem, I have an Angular form with a number of controls. Anytime the validation state changes on any one of the controls, I need to do something. However, controls can be dynamically added to the form later, and I still need to listen for validation changes on those individual controls as well.

I'll try to alter my approach a little to fit the question's context. Basically, yes the right approach here is to have an outer Observable signal when to have the inner Observables update their subscriptions. This is acceptable architecture and frankly is one of the reasons Reactive Extensions is so powerful. Here's how I did it using RxJS 6.2.2 and TypeScript 3.0.1. Note that at the time this question was asked, or previously answered, this method might not have been available.

import { map, mergeMap, delay } from 'rxjs/operators';
import { interval, Subject, Observable } from 'rxjs';

// utility function to create sample observables
const createObs = (time: number, i: string) => interval(time).pipe(map(val => `${i}: ${val}`))

// create static sample observables. 
// these could represent the value changes of known form elements
const obsA = createObs(1000, 'A'); // emit every 1 second
const obsB = createObs(2500, 'B'); // emit every 2.5 seconds

// create a Subject which will represent dynamically adding a new stream of form values to the calculation
const formChanges = new Subject<Observable<string>>();

// this will hold the accumulated results
const results: string[] = [];

// subscribe to the subject
// each time any of the inner observables emits a new value, this will log the latest value of all existing inner observables
formChanges.pipe(
  mergeMap(innerObs => innerObs, (outerValue, innerValue, outerIndex, innerIndex) => {
    results[outerIndex] = innerValue;
    return results;
  })
).subscribe(console.log);

// emit the known form value change streams
formChanges.next(obsA);
formChanges.next(obsB);

// this will represent adding some other stream of values to the calculation later
formChanges.next(createObs(1750, 'C').pipe(delay(5000))) // after 5 seconds, emit every 1.75 seconds

// Output
// [ 'A: 0' ]
// [ 'A: 1' ]
// [ 'A: 1', 'B: 0' ]
// [ 'A: 2', 'B: 0' ]
// [ 'A: 3', 'B: 0' ]
// [ 'A: 3', 'B: 1' ]
// [ 'A: 4', 'B: 1' ]
// [ 'A: 5', 'B: 1' ]
// [ 'A: 5', 'B: 1', 'C: 0' ]
// [ 'A: 6', 'B: 1', 'C: 0' ]
// [ 'A: 6', 'B: 2', 'C: 0' ]
// [ 'A: 7', 'B: 2', 'C: 0' ]
// [ 'A: 7', 'B: 2', 'C: 1' ]
// [ 'A: 8', 'B: 2', 'C: 1' ]
like image 87
Seth Avatar answered Oct 27 '22 10:10

Seth


I don't think you will find an operator that will directly do what you need.

There is nothing wrong with crafting your own operator though:

var source = //An observable of observables of form data

Observable.prototype.combineLatestObservable = function(resultSelector) {
  var source = this;
  return Rx.Observable.create(function(obs) {
    var disposable = new Rx.SerialDisposable();
    var sources= [];
    return source.subscribe(
      function(x) {
        //Update the set of observables
        sources.push(x);
        //This will dispose of the previous subscription first
        //then subscribe to the new set.
        disposable.seDisposable(Rx.Observable.combineLatest(sources, resultSelector)
                                             .subscribe(obs));
      }, 
      function(e) { obs.onError(e); }, 
      function() { obs.onCompleted(); });
  }).share();
}

Or if you wanted to do it with operators:

//Have to use arguments since we don't know how many values we will have
function sums() {
  var sum = 0;
  for (var i = 0, len = arguments.length; i < len; ++i) { 
    sum += arguments[i]; 
  }
  return sum;
}

source
//Capture the latest set of Observables
.scan([], function(acc, x) {
  acc.push(x);
  return acc;
})
//Dispose of the previous set and subscribe to the new set
.flatMapLatest(function(arr) {
  return Observable.combineLatest(arr, sums);
})
//Don't know how many subscribers you have but probably want to keep from 
//recreating this stream for each
.share();
like image 20
paulpdaniels Avatar answered Oct 27 '22 09:10

paulpdaniels