Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

rxjs/Observable: Running a function once after getting first stream (continuous observable)

First off, sorry for the long title.

I am trying to subscribe to an array of continuous streams with forEach from angularfire2, but I would also like to run a function after I have confirmed that the first set of data has come in:

this.people.forEach((person) => {
    person.items = this.database.list('/items' + person.key);
    person.items.subscribe((data) => {person.itemsList = data});
});

myIntendedFunction();

Is there a way to place myIntendedFunction() such that:

  1. It runs after the first stream of data is received for each person, and
  2. It runs only once?
like image 576
shinglesmingles Avatar asked Aug 22 '16 07:08

shinglesmingles


People also ask

Can you subscribe to observable multiple times?

It turns out that as the Observable is just a definition, let's remember that in a sense its something close to a function declaration, if we subscribe to it multiple times this means that each time a new HTTP request will be issued, one for each subscription.

What happens when you subscribe to an observable?

Observables are declarative —that is, you define a function for publishing values, but it is not executed until a consumer subscribes to it. The subscribed consumer then receives notifications until the function completes, or until they unsubscribe.

What happens if you don't subscribe to an observable?

If you don't subscribe nothing is going to happen. It's good to know that when you subscribe to an observer, each call of subscribe() will trigger it's own independent execution for that given observer. Subscribe calls are not shared among multiple subscribers to the same observable.

Does observable stream data synchronously and asynchronously?

An observable produces values over time. An array is created as a static set of values. In a sense, observables are asynchronous where arrays are synchronous. In the following examples, → implies asynchronous value delivery.


1 Answers

This is a little more complicated to accomplish if you don't want the first request to happen twice:

const connectables: ConnectableObservable<any>[] = [];

this.people.forEach(person => {
    person.items = this.database.list('/items' + person.key);
    const connectable = person.items.publish();
    connectables.push(connectable);
    connectable.subscribe((data) => {person.itemsList = data});
});

Observable.zip(...connectables).take(1).subscribe(myIntendedFunction);

connectables.forEach(c => c.connect());

What happens here is this: The effect of publish() is basically that you can subscribe to the same stream of data multiple times. Also the subscription function won't be invoked until you call connect(). If we used person.items.share() which is sugar for person.items.publish().connect(), it would make the requests right away and our application might be buggy due to race-conditions.

zip() waits for every passed observable to emit an item and emits those items all at once as an array. We only want this to happen on the first set of items, so we just take(1).

like image 66
j2L4e Avatar answered May 28 '23 06:05

j2L4e