The examples in the RxJS README seem to suggest we have to subscribe to a source. In other words: we wait for the source to send events. In that sense, sources seem to be push-based: the source decides when it creates new items.
This contrasts, however, with iterators, where strictly speaking new items need only be created when requested, i.e., when a call is made to next()
. This is pull-based
behavior, also known as lazy generation.
For instance, a stream could return all Wikipedia pages for prime numbers. The items are only generated when you ask for them, because generating all of them upfront is quite an investment, and maybe only 2 or 3 of them might be read anyway.
Can RxJS also have such pull-based behavior, so that new items are only generated when you ask for them?
The page on backpressure seems to indicate that this is not possible yet.
Short answer is no.
RxJS is designed for reactive applications so as you already mentioned if you need pull-based semantics you should be using an Iterator
instead of an Observable
. Observables
are designed to be the push-based counterparts to the iterator, so they really occupy different spaces algorithmically speaking.
Obviously, I can't say this will never happen, because that is something the community will decide. But as far as I know 1) the semantics for this case just aren't that good and 2) this runs counter to the idea of reacting to data.
A pretty good synopsis can be found here. It is for Rx.Net but the concepts are similarly applicable to RxJS.
Controlled observable from the page you referenced can change a push observable to pull.
var controlled = source.controlled();
// this callback will only be invoked after controlled.request()
controlled.subscribe(n => {
log("controlled: " + n);
// do some work, then signal for next value
setTimeout(() => controlled.request(1), 2500);
});
controlled.request(1);
A truly synchronous iterator is not possible, as it would block when the source was not emitting.
In the snippet below, the controlled subscriber only gets a single item when it signals, and it does not skip any values.
var output = document.getElementById("output");
var log = function(str) {
output.value += "\n" + str;
output.scrollTop = output.scrollHeight;
};
var source = Rx.Observable.timer(0, 1000);
source.subscribe(n => log("source: " + n));
var controlled = source.controlled();
// this callback will only be invoked after controlled.request()
controlled.subscribe(n => {
log("controlled: " + n);
// do some work, then signal for next value
setTimeout(() => controlled.request(1), 2500);
});
controlled.request(1);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.all.js"></script>
<body>
<textarea id="output" style="width:150px; height: 150px"></textarea>
</body>
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With