Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does the RxJs 5 share() operator work?

Its not 100% clear for me how the RxJs 5 share() operator works, see here the latest docs. Jsbin for the question here.

If I create an observable with a series of 0 to 2, each value separated by one second:

var source = Rx.Observable.interval(1000)
.take(5)
.do(function (x) {
    console.log('some side effect');
});

And if I create two subscribers to this observable:

source.subscribe((n) => console.log("subscriptor 1 = " + n));
source.subscribe((n) => console.log("subscriptor 2 = " + n));

I get this in the console:

"some side effect ..."
"subscriptor 1 = 0"
"some side effect ..."
"subscriptor 2 = 0"
"some side effect ..."
"subscriptor 1 = 1"
"some side effect ..."
"subscriptor 2 = 1"
"some side effect ..."
"subscriptor 1 = 2"
"some side effect ..."
"subscriptor 2 = 2"

I thought each subscription would subscribe to the same Observable, but it does not seem to be the case! Its like the act of subscribing creates a completely separate Observable!

But if the share() operator is added to the source observable:

var source = Rx.Observable.interval(1000)
.take(3)
.do(function (x) {
    console.log('some side effect ...');
})
.share();

Then we get this:

"some side effect ..."
"subscriptor 1 = 0"
"subscriptor 2 = 0"
"some side effect ..."
"subscriptor 1 = 1"
"subscriptor 2 = 1"
"some side effect ..."
"subscriptor 1 = 2"
"subscriptor 2 = 2"

Which is what I would expect without the share().

Whats going on here, how does the share()operator work ? Does each subscription create a new Observable chain?

like image 897
Angular University Avatar asked Feb 01 '16 22:02

Angular University


People also ask

What is Share operator in RxJS?

RxJS share() operator is a multicasting operator which returns a new observable that shares or multicasts the original observable. As long as there is at least one subscriber, this observable will be subscribed and emitting data. When all subscribers have unsubscribed, it will unsubscribe from the source observable.

What is of () RxJS?

RxJS' of() is a creational operator that allows you to create an RxJS Observable from a sequence of values. According to the official docs: of() converts the arguments to an observable sequence. In Angular, you can use the of() operator to implement many use cases.

What does of operator do in RxJS?

The of Operator is a creation Operator. Creation Operators are functions that create an Observable stream from a source. The of Operator will create an Observable that emits a variable amount of values in sequence, followed by a Completion notification.

How do RxJS Observables work?

Observables are data source wrappers and then the observer executes some instructions when there is a new value or a change in data values. The Observable is connected to the observer who does the execution through subscription, with a subscribe method the observer connects to the observable to execute a code block.


2 Answers

Be careful that you are using RxJS v5 while your documentation link seem to be RxJS v4. I don't remember specifics but I think that the share operator went through some changes, in particular when it comes to completion and resubscription, but don't take my word for it.

Back to your question, as you have shown in your study, your expectations do not correspond to the library design. Observables lazily instantiate their data flow, concretely initiating the dataflow when a subscriber subscribes. When a second subscriber subscribes to the same observable, another new dataflow is started as if it is was the first subscriber (so yes, each subscription creates a new chain of observables as you said). This is what is coined in RxJS terminology as a cold observable and that's the default behaviour for RxJS observable. If you want an observable which sends its data to the subscribers it has at the moment the data arrives, this is coined a hot observable, and one way to get a hot observable is to use the share operator.

You can find illustrated subscription and data flows here : Hot and Cold observables : are there 'hot' and 'cold' operators? (this is valid for RxJS v4, but most of it is valid for v5).

like image 184
user3743222 Avatar answered Oct 19 '22 04:10

user3743222


share makes the observable "hot" if these 2 conditions are met:

  1. the number of subscribers > 0
  2. AND the observable has not completed

Scenario1: number of subscribers > 0 and observable is not completed before a new subscription

var shared  = rx.Observable.interval(5000).take(2).share();
var startTime = Date.now();
var log = (x) => (value) => { 
    console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`);
};

var observer1 = shared.subscribe(log('observer1')),
    observer2;

setTimeout(()=>{
    observer2 = shared.subscribe(log('observer2'));
}, 3000);

// emission for both observer 1 and observer 2, with the samve value at startTime + 5 seconds
// another emission for both observers at: startTime + 10 seconds

Scenario 2: number of subscribers is zero before a new subscription. Becomes "cold"

 var shared  = rx.Observable.interval(5000).take(2).share();
    var startTime = Date.now();
    var log = (x) => (value) => { 
    console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`);
};

var observer1 = shared.subscribe(log('observer1')),
    observer2;

setTimeout(()=>{
    observer1.unsubscribe(); 
}, 1000);

setTimeout(()=>{
    observer2 = shared.subscribe(log('observer2')); // number of subscribers is 0 at this time
}, 3000);
// observer2's onNext is called at startTime + 8 seconds
// observer2's onNext is called at startTime + 13 seconds

Scenario 3: when observable was completed before a new subscription. Becomes "cold"

 var shared  = rx.Observable.interval(5000).take(2).share();
    var startTime = Date.now();
    var log = (x) => (value) => { 
        console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`);
    };

var observer1 = shared.subscribe(log('observer1')),
    observer2;

setTimeout(()=>{
    observer2 = shared.subscribe(log('observer2'));
}, 12000);

// 2 emission for observable 1, at startTime + 5 secs, and at startTime + 10secs
// 2 emissions for observable 2,at startTime + 12 + 5 secs, and at startTime + 12 + 10secs
like image 22
sbr Avatar answered Oct 19 '22 06:10

sbr