I have an RxJS sequence being consumed in the normal manner...
However, in the observable 'onNext' handler, some of the operations will complete synchronously, but others require async callbacks, that need to be waited on before processing the next item in the input sequence.
...little bit confused how to do this. Any ideas? thanks!
someObservable.subscribe( function onNext(item) { if (item == 'do-something-async-and-wait-for-completion') { setTimeout( function() { console.log('okay, we can continue'); } , 5000 ); } else { // do something synchronously and keep on going immediately console.log('ready to go!!!'); } }, function onError(error) { console.log('error'); }, function onComplete() { console.log('complete'); } );
You can use Observables with Promises and with async/await to benefit from the strengths of each of those tools.
An observable produces values over time. An array is created as a static set of values. In a sense, observables are asynchronous where arrays are synchronous. In the following examples, → implies asynchronous value delivery.
Observables Can Emit Data and Notifications Synchronously Observables can also emit data and notifications synchronously. The observable function above produces data synchronously.
RxJS is a library for composing asynchronous and event-based programs by using observable sequences. It provides one core type, the Observable, satellite types (Observer, Schedulers, Subjects) and operators inspired by Array#extras (map, filter, reduce, every, etc) to allow handling asynchronous events as collections.
Each operation you want to perform can be modeled as an observable. Even the synchronous operation can be modeled this way. Then you can use map
to convert your sequence into a sequence of sequences, then use concatAll
to flatten the sequence.
someObservable .map(function (item) { if (item === "do-something-async") { // create an Observable that will do the async action when it is subscribed // return Rx.Observable.timer(5000); // or maybe an ajax call? Use `defer` so that the call does not // start until concatAll() actually subscribes. return Rx.Observable.defer(function () { return Rx.Observable.ajaxAsObservable(...); }); } else { // do something synchronous but model it as an async operation (using Observable.return) // Use defer so that the sync operation is not carried out until // concatAll() reaches this item. return Rx.Observable.defer(function () { return Rx.Observable.return(someSyncAction(item)); }); } }) .concatAll() // consume each inner observable in sequence .subscribe(function (result) { }, function (error) { console.log("error", error); }, function () { console.log("complete"); });
To reply to some of your comments...at some point you need to force some expectations on the stream of functions. In most languages, when dealing with functions that are possibly async, the function signatures are async and the actual async vs sync nature of the function is hidden as an implementation detail of the function. This is true whether you are using javaScript promises, Rx observables, c# Tasks, c++ Futures, etc. The functions end up returning a promise/observable/task/future/etc and if the function is actually synchronous, then the object it returns is just already completed.
Having said that, since this is JavaScript, you can cheat:
var makeObservable = function (func) { return Rx.Observable.defer(function () { // execute the function and then examine the returned value. // if the returned value is *not* an Rx.Observable, then // wrap it using Observable.return var result = func(); return result instanceof Rx.Observable ? result: Rx.Observable.return(result); }); } someObservable .map(makeObservable) .concatAll() .subscribe(function (result) { }, function (error) { console.log("error", error); }, function () { console.log("complete"); });
First of all, move your async operations out of subscribe
, it's not made for async operations.
What you can use is mergeMap
(alias flatMap
) or concatMap
. (I am mentioning both of them, but concatMap
is actually mergeMap
with the concurrent
parameter set to 1.) Settting a different concurrent parameter is useful, as sometimes you would want to limit the number of concurrent queries, but still run a couple concurrent.
source.concatMap(item => { if (item == 'do-something-async-and-wait-for-completion') { return Rx.Observable.timer(5000) .mapTo(item) .do(e => console.log('okay, we can continue')); } else { // do something synchronously and keep on going immediately return Rx.Observable.of(item) .do(e => console.log('ready to go!!!')); } }).subscribe();
I will also show how you can rate limit your calls. Word of advice: Only rate limit at the point where you actually need it, like when calling an external API that allows only a certain number of requests per second or minutes. Otherwise it is better to just limit the number of concurrent operations and let the system move at maximal velocity.
We start with the following snippet:
const concurrent; const delay; source.mergeMap(item => selector(item, delay) , concurrent)
Next, we need to pick values for concurrent
, delay
and implement selector
. concurrent
and delay
are closely related. For example, if we want to run 10 items per second, we can use concurrent = 10
and delay = 1000
(millisecond), but also concurrent = 5
and delay = 500
or concurrent = 4
and delay = 400
. The number of items per second will always be concurrent / (delay / 1000)
.
Now lets implement selector
. We have a couple of options. We can set an minimal execution time for selector
, we can add a constant delay to it, we can emit the results as soon as they are available, we can can emit the result only after the minimal delay has passed etc. It is even possible to add an timeout by using the timeout
operators. Convenience.
Set minimal time, send result early:
function selector(item, delay) { return Rx.Observable.of(item) .delay(1000) // replace this with your actual call. .merge(Rx.Observable.timer(delay).ignoreElements()) }
Set minimal time, send result late:
function selector(item, delay) { return Rx.Observable.of(item) .delay(1000) // replace this with your actual call. .zip(Rx.Observable.timer(delay), (item, _)) }
Add time, send result early:
function selector(item, delay) { return Rx.Observable.of(item) .delay(1000) // replace this with your actual call. .concat(Rx.Observable.timer(delay).ignoreElements()) }
Add time, send result late:
function selector(item, delay) { return Rx.Observable.of(item) .delay(1000) // replace this with your actual call. .delay(delay) }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With