Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is Rx.Subject a hot observable?

Tags:

rxjs

rxjs5

The code

const a = new Rx.Subject().do(x => console.log('a'))
const b = a.mapTo(0)
const c = a.mapTo(1)
const d = Rx.Observable.merge(b, c)
d.subscribe(x => console.log('d'))
a.next(3)

And the output

a
d
a
d

Why does a got printed twice? Isn't Rx.Subject a hot observable?

like image 204
c c Avatar asked Feb 24 '17 09:02

c c


2 Answers

The Subject itself is hot/shared.

However: Any(most!) operators that you append will create a new stream, with the previous stream(in this case the Subject) as source - the new stream, however, is (for most operators) not hot and will only be made hot by deriving a hot stream through appending a hot operator (like share or publish ect...)

So when you share your do, everything should work as expected.

const a = new Rx.Subject().do(x => console.log('a')).share();
const b = a.mapTo(0);
const c = a.mapTo(1);
const d = Rx.Observable.merge(b, c)
d.subscribe(x => console.log('d'));
a.next(3);
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>
like image 117
olsn Avatar answered Oct 20 '22 18:10

olsn


You need to understand cold/hot observable and subject.

A cold Observable is an Observable that re-executes its subscribe handler every time it's subscribed to:

const cold = new Observable(function subscribe(observer) {
  console.log('subscribed');
  observer.next(Math.random());
  observer.complete();
});
// > subscribed
// sub 1: 0.1231231231231
cold.subscribe((num) => console.log('sub 1:', num));
// > subscribed
// sub 2: 0.09805969045
cold.subscribe((num) => console.log('sub 2:', num));

A hot Observable is a source Observable (cold or otherwise) that has a Subject between the source and subscribers. When a hot Observable is subscribed to, the subscription is internally routed to the inner Subject transparently, and the Subject is subscribed to the source Observable. This ensures the source Observable only has one subscriber (the Subject), and the Subject shares the source's value with many Subscribers:

const cold = new Observable(function subscribe(observer) {
  console.log('subscribed');
  observer.next(Math.random());
  observer.complete();
});

const hot = cold.publish();
hot.subscribe((num) => console.log('sub 1:', num));
hot.subscribe((num) => console.log('sub 2:', num));
hot.connect(); // <-- this subscribes the inner Subject to the cold source
// > subscribed
// > sub 1: 0.249848935489
// > sub 2: 0.249848935489

You can make an Observable hot via multicast, which takes a function that returns a Subject to use when it's connected. There are also variants of multicast for convenience (such as publish) that create specific types of Subjects. publish() is a convenience method for multicast(() => new Subject())

In addition to connect(), which subscribes the inner Subject to the source and returns the underlying Subscription, you can call refCount(), which returns an Observable. When the Observable returned by refCount() is subscribed to once, it will automatically call connect() internally, and subsequent subscriptions won't reconnect. When all subscribers unsubscribe, refCount will automatically unsubscribe the inner Subject from the source. share() is a convenience method for source.publish().refCount().

So, it will work,

const a = new Rx.Subject().do(x => console.log('a')).share();
const b = a.mapTo(0);
const c = a.mapTo(1);
const d = Rx.Observable.merge(b, c)
d.subscribe(x => console.log('d'));
a.next(3);
like image 33
Bhargav Patel Avatar answered Oct 20 '22 17:10

Bhargav Patel