I have a very simple timeInterval observable and I want to start/stop transmission without disconnecting subscribers (which should sit and wait regardless of observable status). Is possible, and if so how?
var source = Rx.Observable
.interval(500)
.timeInterval()
.map(function (x) { return x.value + ':' + x.interval; })
.take(10);
var subscription = source.subscribe(
function (x) {
$("#result").append('Next: ' + x + ' ');
},
function (err) {
$("#result").append('Error: ' + err);
},
function () {
$("#result").append('Completed');
});
general comment: most of the examples ive seen show how to define observables and subscribers. how do i affect the behavior of existing objects?
unsubscribe(); Note that since interval() is asynchronous you can call unsubscribe() inside the subscribe 's callback as well. Jul 2019: Updated for RxJS 6. subscribe leads to imperative reactive code, particularly when the intent is to unsubscribe .
Good. Manually unsubscribe from each observable. To unsubscribe from an observable subscription, we must create a Subscription variable (timer$), assign the subscription to this variable, and then in the ngOnDestroy lifecycle hook unsubscribe the subscription.
There're are basically two ways: call unsubscribe() on the Subscription object returned from the subscribe() call . use an operator.
Depends on what is the source of the stop/resume signal. The simplest way I can think about is with the pausable
operator, which as the documentation says works better with hot observables. So in the following sample code, I removed the take(10)
(your pausable signal now comes through the pauser
subject), and added share
to turn your observable into a hot one.
var pauser = new Rx.Subject(); var source = Rx.Observable .interval(500) .timeInterval() .map(function (x) { return x.value + ':' + x.interval; }) .share() .pausable(pauser); var subscription = source.subscribe( function (x) { $("#result").append('Next: ' + x + ' '); }, function (err) { $("#result").append('Error: ' + err); }, function () { $("#result").append('Completed'); }); // To begin the flow pauser.onNext(true); // or source.resume(); // To pause the flow at any point pauser.onNext(false); // or source.pause();
Here is a more sophisticated example which will pause your source every 10 items:
// Helper functions
function emits ( who, who_ ) {return function ( x ) {
who.innerHTML = [who.innerHTML, who_ + " emits " + JSON.stringify(x)].join("\n");
};}
var pauser = new Rx.Subject();
var source = Rx.Observable
.interval(500)
.timeInterval()
.map(function (x) { return x.value + ':' + x.interval; })
.share();
var pausableSource = source
.pausable(pauser);
source
.scan(function (acc, _){return acc+1}, 0)
.map(function(counter){return !!(parseInt(counter/10) % 2)})
.do(emits(ta_validation, 'scan'))
.subscribe(pauser);
var subscription = pausableSource.subscribe(
function (x) {
$("#ta_result").append('Next: ' + x + ' ');
},
function (err) {
$("#ta_result").append('Error: ' + err);
},
function () {
$("#ta_result").append('Completed');
});
You should have by now your answer to the second question. Combine the observables you are given with the relevant RxJS operators to realize your use case. This is what I did here.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With