Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create Hot Observable in RxJS 5

I use RxJS 4 to create Hot Observable like describe in the official documentation

But the RxJS 5 migration guid doesn't explain how to create Hot Observable.

const source = Observable.create((observer) => {
  asynCall((data) => {
    observer.onNext(data)
  })
})
const published = source.publish()
published.connect()
published.subscribe((data) => {
  console.log(data)
})

With Rx5 I've got the following error:

Uncaught TypeError: source.publish is not a function(…)

like image 312
Gregory Houllier Avatar asked May 11 '16 07:05

Gregory Houllier


2 Answers

You can use .publish().refCount() (or .share() as an alias)

var source = Rx.Observable.interval(1000)
    .take(5)
    .publish().refCount();

source.subscribe( item => console.log(`-: ${item}`));
setTimeout(() => {
    source.subscribe( item => console.log(`--: ${item}`));
}, 2500);

http://jsbin.com/cupibitehu/1/edit?js,console

The above sets up two subscriptions to show that the delayed subscribe will start where the current emits are happening and not replaying previously emitted results.

logs the following

"-: 0"
"-: 1"
"-: 2"
"--: 2"
"-: 3"
"--: 3"
"-: 4"
"--: 4"

http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-share

like image 97
subhaze Avatar answered Nov 14 '22 23:11

subhaze


Here is a take on hot and cold observables

 Hot Observable

Any observables that doesn't produce new producer on subscription (if obs.subscribe() is called on the observable, there shouldn't be a new observer.next(val) for this subscription new created. It should use the old one and therefore get the value at that current point in time)

Cold Observable

Any observable that produces new producer on subscription - obs.subscribe() basically creates observer.next() - a new source producer for that subscription alone. All HttpCLient observables in angular are of this type. For each obs.subscribe in the compoent, u see values being emitted from the API for that subscription alone. Everytime it starts from the beginning

So basically what publish() and refCount() rxjs operators did above was to create an observable which behaves like both a hot and cold observable at the same time. Hot observable - because no new source values are produced and Cold Observable - because it starts emitting values only when a subscription happens - obs.subscribe() is called for the first time

So in order to produce a HotObservable which doesnt need a subscription to exist, then publish() and connect() will do the trick

var source = Rx.Observable.create({<some obj>})
   
    .publish();
source.connect();

source.subscribe( item => console.log(`-: ${item}`));
setTimeout(() => {
    source.subscribe( item => console.log(`--: ${item}`));
}, 2500);

You would not see anything in the console because the observable would have emitted the values by the time subscription has happened. Note if there is a delay in the observable emitting the values, then you might see them in the subscription I wanted to give credit to this article as well: Its very well explained https://blog.thoughtram.io/angular/2016/06/16/cold-vs-hot-observables.html

like image 20
vijayakumarpsg587 Avatar answered Nov 15 '22 00:11

vijayakumarpsg587