Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ForkJoin 2 BehaviorSubjects

I have two behaviour subject streams what I'm trying to forkJoin with no luck. As I imagined it gives back the two last values of it. Is this possible to implement it somehow?

It is not called after the subject.

let stream1 = new BehaviorSubject(2);
let stream2 = new BehaviorSubject('two');

Observable.forkJoin(stream1, stream2)
    .subscribe(r => {
         console.log(r);
    });
like image 260
Lajos Avatar asked Sep 27 '16 10:09

Lajos


People also ask

Is forkJoin deprecated?

I have checked my project that forkjoin is being deprecated, since forkjoin is deprecated from angular what is the best option to use instead?

What can I use instead of forkJoin?

concat() which will handle each observable in sequence.

What is the difference between combineLatest and forkJoin?

combineLatest is similar to forkJoin, except that it combines the latest results of all the observables and emits the combined final value. So until each observable is completed, the subscription block emits the result.

Does forkJoin run in parallel?

forkJoin is one of the most popular combination operators due to its similar behavior to Promise. all but for observables. forkJoin accepts a variable number of observables and subscribes to them in parallel.


2 Answers

Note what forkJoin() actually does from its documentation:

Wait for Observables to complete and then combine last values they emitted.

This means that forkJoin() emits a value when all input Observable are complete. When using BehaviorSubject this means explicitly calling complete() on both of them:

import { Observable, BehaviorSubject, forkJoin } from 'rxjs';

const stream1 = new BehaviorSubject(2);
const stream2 = new BehaviorSubject('two');

forkJoin(stream1, stream2)
  .subscribe(r => {
    console.log(r);
  });

stream1.complete();
stream2.complete();

See live demo: https://stackblitz.com/edit/rxjs-9nqtx6

March 2019: Updated for RxJS 6.

like image 89
martin Avatar answered Oct 12 '22 14:10

martin


You can use take(1) pipe or complete() method that mentioned above.

private subjectStream1 = new BehaviorSubject(null);
stream1$: Observable = this.subjectStream1.asObservable();

private subjectStream2 = new BehaviorSubject(null);
stream2$: Observable = this.subjectStream2.asObservable();

forkJoin({
  stream1: this.stream1$.pipe(take(1)),
  stream2: this.stream2$.pipe(take(1))
})
.pipe(takeUntil(this._destroyed$))
.subscribe(values) => console.log(values));
like image 23
sjcoder Avatar answered Oct 12 '22 15:10

sjcoder