Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Wait for a promise to resolve with RxJs

I'm trying to implement a "save on type" feature for a form using RxJS v5 beta.

The data should be posted to the backend as the user types into the text fields. I'm creating a Rx.Subject to fire new events (next()) for new user input and post it with HTTP requests.

I've used this question as a starting point: RxJS wait until promise resolved

However, with the solution from this post, simultaneous request to the backend are sent.

My goal is to only send one request and defer following requests until a running request has completed. After completion of the request the last of the pending events should be emitted (like it is the case in debounceTime)

The example function in the following snippet uses the approach from the linked SO question. This sends requests for all the input values.

The workaround function function uses a promise stored outside of the "stream" to block and wait for a previous request. This works and only sends a request for the last input value. But that seems to not follow the concept of RxJs and feels hacky.

Is there a way to achieve this with RxJS?

function fakeRequest(value) {
  console.log('start request:', value)
  return new Promise((resolve) => { 
    setTimeout(() => resolve(value), 1000);
 });
}

function example() {
  let subject = new Rx.Subject();
  
  subject
    .debounceTime(500)
    .switchMap(input => fakeRequest(input))
    .subscribe(data => console.log(data))

  subject.next('example value 1');
  subject.next('example value 2');
  subject.next('example value 3');
  subject.next('example value 4');
}


function workaround() {
  let subject = new Rx.Subject();

  let p = Promise.resolve();
  subject
    .debounceTime(500)  
    .switchMap(input => p.then(() => input))
    .do(input => p = fakeRequest(input))
    .subscribe(data => console.log(data))

  subject.next('workaround value 1');
  subject.next('workaround value 2');
  subject.next('workaround value 3');
  subject.next('workaround value 4');
}

example();
// workaround();
<script src="https://unpkg.com/@reactivex/[email protected]/dist/global/Rx.js"></script>
like image 632
Stefan Avatar asked Nov 14 '16 11:11

Stefan


People also ask

Can I use async await with Observable?

You can use Observables with Promises and with async/await to benefit from the strengths of each of those tools.

How do I use RxJS defer?

defer allows you to create an Observable only when the Observer subscribes. It waits until an Observer subscribes to it, calls the given factory function to get an Observable -- where a factory function typically generates a new Observable -- and subscribes the Observer to this Observable.

What is difference between RxJS and Promise?

the Promise is always asynchronous, while the Observable can be either asynchronous or synchronous, the Promise can provide a single value, whereas the Observable is a stream of values (from 0 to multiple values), you can apply RxJS operators to the Observable to get a new tailored stream.

Do observables replace Promises?

While an Observable can do everything a Promise can, the reverse is not true. For example, an Observable can emit multiple values over time. A Promise only resolves once.


2 Answers

If you want to run requests in order and not discard any of them then use concat() or concatMap() operators. These wait until the previous Observable completes and then continue with the next one.

function fakeRequest(value) {
  console.log('start request:', value)
  return new Promise((resolve) => { 
    setTimeout(() => resolve(value), 1000);
 });
}

let subject = new Subject();
subject.concatMap(value => Observable.fromPromise(fakeRequest(value)))
  .subscribe(value => console.log(value));

subject.next('example value 1');
subject.next('example value 2');
subject.next('example value 3');
subject.next('example value 4');

This prints to console:

start request: example value 1
example value 1
start request: example value 2
example value 2
start request: example value 3
example value 3
start request: example value 4
example value 4

See live demo: https://jsbin.com/xaluvi/4/edit?js,console

If you wanted to ignore values then debounce, throttle or audit are all good choices.

Edit: In newer RxJS versions you don't even need to use fromPromise (or from) and just return the Promise in concatMap.

like image 52
martin Avatar answered Sep 25 '22 02:09

martin


You can create observable from a promise and use use delayWhen operator in order to wait until that observable emits (promise resolves).

import { timer } from 'rxjs';
import { delayWhen, from } from 'rxjs/operators';

const myPromise = new Promise(resolve => setTimeout(resolve, 3000)); // will resolve in 3 sec


const waitable = timer(0, 1000).pipe(delayWhen(() => from(myPromise)));
waitable.subscribe(x => console.log(x));

Each emit should be delayed by 3 seconds.

like image 20
cuddlemeister Avatar answered Sep 25 '22 02:09

cuddlemeister