Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can RxJS be used in a pull-based way?

The examples in the RxJS README seem to suggest we have to subscribe to a source. In other words: we wait for the source to send events. In that sense, sources seem to be push-based: the source decides when it creates new items.

This contrasts, however, with iterators, where strictly speaking new items need only be created when requested, i.e., when a call is made to next(). This is pull-based behavior, also known as lazy generation.

For instance, a stream could return all Wikipedia pages for prime numbers. The items are only generated when you ask for them, because generating all of them upfront is quite an investment, and maybe only 2 or 3 of them might be read anyway.

Can RxJS also have such pull-based behavior, so that new items are only generated when you ask for them?

The page on backpressure seems to indicate that this is not possible yet.

like image 350
Ruben Verborgh Avatar asked Nov 11 '15 22:11

Ruben Verborgh


2 Answers

Short answer is no.

RxJS is designed for reactive applications so as you already mentioned if you need pull-based semantics you should be using an Iterator instead of an Observable. Observables are designed to be the push-based counterparts to the iterator, so they really occupy different spaces algorithmically speaking.

Obviously, I can't say this will never happen, because that is something the community will decide. But as far as I know 1) the semantics for this case just aren't that good and 2) this runs counter to the idea of reacting to data.

A pretty good synopsis can be found here. It is for Rx.Net but the concepts are similarly applicable to RxJS.

like image 92
paulpdaniels Avatar answered Nov 02 '22 22:11

paulpdaniels


Controlled observable from the page you referenced can change a push observable to pull.

var controlled = source.controlled();

// this callback will only be invoked after controlled.request()
controlled.subscribe(n => {
  log("controlled: " + n);
  // do some work, then signal for next value
  setTimeout(() => controlled.request(1), 2500);
});

controlled.request(1);

A truly synchronous iterator is not possible, as it would block when the source was not emitting.

In the snippet below, the controlled subscriber only gets a single item when it signals, and it does not skip any values.

var output = document.getElementById("output");
var log = function(str) {
  output.value += "\n" + str;
  output.scrollTop = output.scrollHeight;
};

var source = Rx.Observable.timer(0, 1000);
source.subscribe(n => log("source: " + n));

var controlled = source.controlled();
// this callback will only be invoked after controlled.request()
controlled.subscribe(n => {
  log("controlled: " + n);
  // do some work, then signal for next value
  setTimeout(() => controlled.request(1), 2500);
});
controlled.request(1);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.all.js"></script>

<body>
  <textarea id="output" style="width:150px; height: 150px"></textarea>
</body>
like image 40
rpisryan Avatar answered Nov 02 '22 22:11

rpisryan