I'm trying to use forkJoin
on two Observables. One of them starts as a stream... If I subscribe to them directly I get a response, forkJoin
isn't firing though. Any ideas?
private data$: Observable<any[]>; private statuses$: Observable<any[]>; private queryStream = new Subject<string>(); .... this.data$ = this.queryStream .startWith('') .flatMap(queryInput => { this.query = queryInput return this._companyService.getCompanies(this.queryRequired + ' ' + this.query, this.page, this.sort); }) .share(); ... Observable.forkJoin(this.statuses$, this.companies$) .subscribe(res => { console.log('forkjoin'); this._countStatus(res[0], res[1]); }); // This shows arrays in the console... this.statuses$.subscribe(res => console.log(res)); this.companies$.subscribe(res => console.log(res)); // In the console Array[9] Array[6]
The RxJS forkJoin() operator is a join operator that accepts an Array of ObservableInput or a dictionary Object of ObservableInput and returns an Observableand then waits for the Observables to complete and then combine last values they emitted.
forkJoin Improvements Moreover, there is one deprecation — forkJoin(a, b, c, d) should no longer be used; Instead, pass an array such as forkJoin([a, b, c, d]) .
concat() which will handle each observable in sequence.
forkJoin will wait for all passed observables to emit and complete and then it will emit an array or an object with last values from corresponding observables.
forkJoin
emits only when all inner observables have completed. If you need an equivalent of forkJoin
that just listens to a single emission from each source, use combineLatest
+ take(1)
combineLatest( this.statuses$, this.companies$, ) .pipe( take(1), ) .subscribe(([statuses, companies]) => { console.log('forkjoin'); this._countStatus(statuses, companies); });
As soon as both sources emit, combineLatest
will emit and take(1)
will unsubscribe immediately after that.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With