I'm trying to create an observable using RxJS that does what is pictured.
This should do the trick.
var Rx = require('rx'),
source = Rx.Observable.interval(10).take(100),
log = console.log.bind(console);
Rx.Observable.create(function (observer) {
var delaying = false,
hasValue = false,
complete = false,
value;
function onNext (x) {
value = x;
if (delaying) {
hasValue = true;
} else {
sendValue();
}
}
function sendValue () {
observer.onNext(value);
if (complete) {
observer.onCompleted();
} else {
setTimeout(callback, 1000); // exercise for the reader. Use a scheduler.
}
delaying = true;
}
function callback () {
if (hasValue) {
hasValue = false;
sendValue();
} else {
delaying = false;
}
}
return source.subscribe(
onNext,
observer.onError.bind(observer),
function () {
if (hasValue) {
complete = true;
} else {
observer.onCompleted();
}
}
);
})
.subscribe(log);
Here is Christopher's solution modified into a operator.
The throttleImmediate
operator only stores the latest value from the source until the given selector completes. It fires the cached value, if existent, right after each completion.
It is best suited to use when the selector has side effects (e.g. an animation).
var Rx = require('rx'),
source = Rx.Observable.interval(10).take(500),
log = console.log.bind(console);
Rx.Observable.prototype.throttleImmediate = function (selector) {
var source = this;
return Rx.Observable.create(function (observer) {
var delaying = false,
hasValue = false,
complete = false,
value;
function onNext (x) {
value = x;
if (delaying) {
hasValue = true;
} else {
sendValue();
}
}
function sendValue () {
delaying = true;
selector(value).subscribe(
observer.onNext.bind(observer),
observer.onError.bind(observer),
function(){
if (hasValue) {
hasValue = false;
sendValue();
} else {
delaying = false;
}
}
);
}
return source.subscribe(
onNext,
observer.onError.bind(observer),
function () {
if (hasValue) {
complete = true;
} else {
observer.onCompleted();
}
}
);
});
};
source
.throttleImmediate(function(data){
var delay;
if(data%2==0)
delay=500;
else
delay=1000;
return Rx.Observable.timer(delay).map(function(){ return data; });
})
.subscribe(log)
This comes in handy while back pressuring sources where the value to delay is only known by the selector.
Example: Given the question's marble diagram.
Let's suppose the first source are ajax calls with html data to display, ajaxPages
that originated from clicks on a navbar.
And we want render them along with an entry animation, animatePage
, whose duration is dynamic.
ajaxPages.throttleImmediate(animatePage).subscribe();
Here we animate the pages with the values from the source, skipping all the values that are emitted during the period of animation except the latest.
In practice, what we get is an stream that ignores clicks that are shortly followed by other clicks and are useless to show to the user since they would animate in, and immediately animate out.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With