Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multiple subscriptions to Observable

People also ask

Can I subscribe to an Observable in the service?

The only case where the service might subscribe is if a method is called with an Observable and the service method's job is to do something with the data that the Observable eventually emits.

Can an Observable have multiple subscribers?

To have multiple functions subscribe to a single Observable, just subscribe them to that observable, it is that simple. And actually that's what you did.

How do you emit multiple values in Observable?

Observables can emit multiple values Promises reject/resolve a single event. An Observable will emit events where a defined callback executes for each event. If you want to handle a single event, use a Promise. If you want to stream multiple events from the same API, use Observables.


Subject

In your case, you could simply use a Subject. A subject allows you to share a single execution with multiple observers when using it as a proxy for a group of subscribers and a source.

In essence, here's your example using a subject:

const subject = new Subject();

function trigger(something) {
    subject.next(something);
}

subject.subscribe((x) => console.log('a: ' + x));
subject.subscribe((x) => console.log('b: ' + x));

trigger('TEST');

Result:

a: TEST
b: TEST

Pitfall: Observers arriving too late

Note that the timing of when you subscribe and when you broadcast the data is relevant. If you send a broadcast before subscribing, you're not getting notified by this broadcast:

function trigger(something) {
    subject.next(something);
}

trigger('TEST');

subject.subscribe((x) => console.log('a: ' + x));
subject.subscribe((x) => console.log('b: ' + x));

Result: (empty)


ReplaySubject & BehaviorSubject

If you want to ensure that even future subscribers get notified, you can use a ReplaySubject or a BehaviorSubject instead.

Here's an example using a ReplaySubject (with a cache-size of 5, meaning up to 5 values from the past will be remembered, as opposed to a BehaviorSubject which can remember only the last value):

const subject = new ReplaySubject(5); // buffer size is 5

function trigger(something) {
    subject.next(something);
}

trigger('TEST');

subject.subscribe((x) => console.log('a: ' + x));
subject.subscribe((x) => console.log('b: ' + x));

Result:

a: TEST
b: TEST

To have multiple functions subscribe to a single Observable, just subscribe them to that observable, it is that simple. And actually that's what you did.

BUT your code does not work because after notificationArrayStream.subscribe((x) => console.log('b: ' + x)) is executed, observer is (x) => console.log('b: ' + x)), so observer.next will give you b: TEST.

So basically it is your observable creation which is wrong. In create you passed an observer as parameter so you can pass it values. Those values you need to generate somehow through your own logic, but as you can see your logic here is erroneous. I would recommend you use a subject if you want to push values to the observer.

Something like:

const notificationArrayStream = Rx.Observable.create(function (obs) {
  mySubject.subscribe(obs);
  return () => {}
})

function trigger(something) {
  mySubject.next(something)
}

Every time you subscribe, you are overriding the var observer.

The trigger function only reference this one var, hence no surprise there is only one log.

If we make the var an array it works as intended: JS Bin

let obs = [];

let foo = Rx.Observable.create(function (observer) {
  obs.push(observer);
});

function trigger(sth){
//   console.log('trigger fn');
  obs.forEach(ob => ob.next(sth));
}

foo.subscribe(function (x) {
  console.log(`a:${x}`);
});
foo.subscribe(function (y) {
  console.log(`b:${y}`);
});

trigger(1);
trigger(2);
trigger(3);
trigger(4);

A cleaner solution would be to use Subject, as suggested above.


Observables are not multicasting; unless you use any kind of Subject. You can of course create Subject, pipe the Observable output into like other answers propose.

However if you already have an Observalbe, it is way more convenient to use share() that turns Observable into Subject or shareReplay(n) which would be equivalent for ReplaySubject(n):

import {share} from 'rxjs/operators';

let observer = null

const notificationArrayStream = new Observable(obs => {
  observer = obs;
}).pipe(share());

function trigger(something) {
  observer.next(something)
}

notificationArrayStream.subscribe((x) => console.log('a: ' + x))
notificationArrayStream.subscribe((x) => console.log('b: ' + x))

trigger('TEST')

That's pretty much it.