Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJS multiple subscriptions for Observable.Interval

Is there any solution like following one for RxJS? Is it possible to invoke subscribers's OnNexts on different threads in Rx?

P.S. My first, naive approach(in CoffeeScript) had obviously failed:

hObs = Rx.Observable.interval(35000)
    .startWith(-1)
    .select(moment().format("D MMMM, HH:mm:ss"))
    .publish()

hObs.subscribe((x)->console.log(x))
hObs.connect()
hObs.subscribe((x)->console.log(x, 1))
hObs.connect() 

Second subscription returns nothing for 35s interval, and so on

like image 923
Daniel K Avatar asked Aug 29 '12 14:08

Daniel K


1 Answers

The .select expects a function, not a value. The following works:

(function() {
    var list = document.getElementById("list");
    var stream = Rx.Observable.interval(35000)
    .startWith(-1)
    .select(function(){ return moment().format("D MMMM, HH:mm:ss") });

    stream.subscribe(function(value) {
        var li = document.createElement("li");
        li.innerHTML = "subscriber 1: "+value;
        list.appendChild(li); 
    });    
    stream.subscribe(function(value) {
        var li = document.createElement("li");
        li.innerHTML = "subscriber 2: "+value;
        list.appendChild(li); 
    });
})();

http://jsfiddle.net/9EjSQ/43/

Notice that you don't need to call connect() twice, typically you call it only once. It is better to use "automatic" connect() by calling .publish().refCount() at the end of the observable chain. That would create a ConnectableObservable, in other words a "hot observable" (see https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/creating.md#cold-vs-hot-observables). In this case, we didn't need a hot observable.

In coffeescript:

list = document.getElementById("list")
stream = Rx.Observable.interval(35000)
.startWith(-1)
.select(-> moment().format("D MMMM, HH:mm:ss"))

stream.subscribe((value) ->
    li = document.createElement("li")
    li.innerHTML = "subscriber 1: " + value
    list.appendChild(li)
)
stream.subscribe((value) ->
    li = document.createElement("li")
    li.innerHTML = "subscriber 2: " + value
    list.appendChild(li)
)

http://jsfiddle.net/9EjSQ/44/

like image 121
André Staltz Avatar answered Oct 17 '22 07:10

André Staltz