Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Wait for an async operation in onNext of RxJS Observable

I have an RxJS sequence being consumed in the normal manner...

However, in the observable 'onNext' handler, some of the operations will complete synchronously, but others require async callbacks, that need to be waited on before processing the next item in the input sequence.

...little bit confused how to do this. Any ideas? thanks!

someObservable.subscribe(     function onNext(item)     {         if (item == 'do-something-async-and-wait-for-completion')         {             setTimeout(                 function()                 {                     console.log('okay, we can continue');                 }                 , 5000             );         }         else         {             // do something synchronously and keep on going immediately             console.log('ready to go!!!');         }     },     function onError(error)     {         console.log('error');     },     function onComplete()     {         console.log('complete');     } ); 
like image 992
user3291110 Avatar asked Feb 19 '14 09:02

user3291110


People also ask

Can I use async await with observable?

You can use Observables with Promises and with async/await to benefit from the strengths of each of those tools.

Is observable sync or async?

An observable produces values over time. An array is created as a static set of values. In a sense, observables are asynchronous where arrays are synchronous. In the following examples, → implies asynchronous value delivery.

Can we make observable synchronous?

Observables Can Emit Data and Notifications Synchronously Observables can also emit data and notifications synchronously. The observable function above produces data synchronously.

Is RxJS asynchronous?

RxJS is a library for composing asynchronous and event-based programs by using observable sequences. It provides one core type, the Observable, satellite types (Observer, Schedulers, Subjects) and operators inspired by Array#extras (map, filter, reduce, every, etc) to allow handling asynchronous events as collections.


2 Answers

Each operation you want to perform can be modeled as an observable. Even the synchronous operation can be modeled this way. Then you can use map to convert your sequence into a sequence of sequences, then use concatAll to flatten the sequence.

someObservable     .map(function (item) {         if (item === "do-something-async") {             // create an Observable that will do the async action when it is subscribed             // return Rx.Observable.timer(5000);              // or maybe an ajax call?  Use `defer` so that the call does not             // start until concatAll() actually subscribes.             return Rx.Observable.defer(function () { return Rx.Observable.ajaxAsObservable(...); });         }         else {             // do something synchronous but model it as an async operation (using Observable.return)             // Use defer so that the sync operation is not carried out until             // concatAll() reaches this item.             return Rx.Observable.defer(function () {                 return Rx.Observable.return(someSyncAction(item));             });         }     })     .concatAll() // consume each inner observable in sequence     .subscribe(function (result) {     }, function (error) {         console.log("error", error);     }, function () {         console.log("complete");     }); 

To reply to some of your comments...at some point you need to force some expectations on the stream of functions. In most languages, when dealing with functions that are possibly async, the function signatures are async and the actual async vs sync nature of the function is hidden as an implementation detail of the function. This is true whether you are using javaScript promises, Rx observables, c# Tasks, c++ Futures, etc. The functions end up returning a promise/observable/task/future/etc and if the function is actually synchronous, then the object it returns is just already completed.

Having said that, since this is JavaScript, you can cheat:

var makeObservable = function (func) {     return Rx.Observable.defer(function () {         // execute the function and then examine the returned value.         // if the returned value is *not* an Rx.Observable, then         // wrap it using Observable.return         var result = func();         return result instanceof Rx.Observable ? result: Rx.Observable.return(result);     }); }  someObservable     .map(makeObservable)     .concatAll()     .subscribe(function (result) {     }, function (error) {         console.log("error", error);     }, function () {         console.log("complete");     }); 
like image 148
Brandon Avatar answered Oct 02 '22 05:10

Brandon


First of all, move your async operations out of subscribe, it's not made for async operations.

What you can use is mergeMap (alias flatMap) or concatMap. (I am mentioning both of them, but concatMap is actually mergeMap with the concurrent parameter set to 1.) Settting a different concurrent parameter is useful, as sometimes you would want to limit the number of concurrent queries, but still run a couple concurrent.

source.concatMap(item => {   if (item == 'do-something-async-and-wait-for-completion') {     return Rx.Observable.timer(5000)       .mapTo(item)       .do(e => console.log('okay, we can continue'));     } else {       // do something synchronously and keep on going immediately       return Rx.Observable.of(item)         .do(e => console.log('ready to go!!!'));     } }).subscribe(); 

I will also show how you can rate limit your calls. Word of advice: Only rate limit at the point where you actually need it, like when calling an external API that allows only a certain number of requests per second or minutes. Otherwise it is better to just limit the number of concurrent operations and let the system move at maximal velocity.

We start with the following snippet:

const concurrent; const delay; source.mergeMap(item =>   selector(item, delay) , concurrent) 

Next, we need to pick values for concurrent, delay and implement selector. concurrent and delay are closely related. For example, if we want to run 10 items per second, we can use concurrent = 10 and delay = 1000 (millisecond), but also concurrent = 5 and delay = 500 or concurrent = 4 and delay = 400. The number of items per second will always be concurrent / (delay / 1000).

Now lets implement selector. We have a couple of options. We can set an minimal execution time for selector, we can add a constant delay to it, we can emit the results as soon as they are available, we can can emit the result only after the minimal delay has passed etc. It is even possible to add an timeout by using the timeout operators. Convenience.

Set minimal time, send result early:

function selector(item, delay) {    return Rx.Observable.of(item)      .delay(1000) // replace this with your actual call.      .merge(Rx.Observable.timer(delay).ignoreElements()) } 

Set minimal time, send result late:

function selector(item, delay) {    return Rx.Observable.of(item)      .delay(1000) // replace this with your actual call.      .zip(Rx.Observable.timer(delay), (item, _)) } 

Add time, send result early:

function selector(item, delay) {    return Rx.Observable.of(item)      .delay(1000) // replace this with your actual call.      .concat(Rx.Observable.timer(delay).ignoreElements()) } 

Add time, send result late:

function selector(item, delay) {    return Rx.Observable.of(item)      .delay(1000) // replace this with your actual call.      .delay(delay) } 
like image 44
Dorus Avatar answered Oct 02 '22 07:10

Dorus