I'm trying to implement a "save on type" feature for a form using RxJS v5 beta.
The data should be posted to the backend as the user types into the text fields. I'm creating a Rx.Subject
to fire new events (next()
) for new user input and post it with HTTP requests.
I've used this question as a starting point: RxJS wait until promise resolved
However, with the solution from this post, simultaneous request to the backend are sent.
My goal is to only send one request and defer following requests until a running request has completed. After completion of the request the last of the pending events should be emitted (like it is the case in debounceTime
)
The example
function in the following snippet uses the approach from the linked SO question. This sends requests for all the input values.
The workaround
function function uses a promise stored outside of the "stream" to block and wait for a previous request. This works and only sends a request for the last input value. But that seems to not follow the concept of RxJs and feels hacky.
Is there a way to achieve this with RxJS?
function fakeRequest(value) {
console.log('start request:', value)
return new Promise((resolve) => {
setTimeout(() => resolve(value), 1000);
});
}
function example() {
let subject = new Rx.Subject();
subject
.debounceTime(500)
.switchMap(input => fakeRequest(input))
.subscribe(data => console.log(data))
subject.next('example value 1');
subject.next('example value 2');
subject.next('example value 3');
subject.next('example value 4');
}
function workaround() {
let subject = new Rx.Subject();
let p = Promise.resolve();
subject
.debounceTime(500)
.switchMap(input => p.then(() => input))
.do(input => p = fakeRequest(input))
.subscribe(data => console.log(data))
subject.next('workaround value 1');
subject.next('workaround value 2');
subject.next('workaround value 3');
subject.next('workaround value 4');
}
example();
// workaround();
<script src="https://unpkg.com/@reactivex/[email protected]/dist/global/Rx.js"></script>
You can use Observables with Promises and with async/await to benefit from the strengths of each of those tools.
defer allows you to create an Observable only when the Observer subscribes. It waits until an Observer subscribes to it, calls the given factory function to get an Observable -- where a factory function typically generates a new Observable -- and subscribes the Observer to this Observable.
the Promise is always asynchronous, while the Observable can be either asynchronous or synchronous, the Promise can provide a single value, whereas the Observable is a stream of values (from 0 to multiple values), you can apply RxJS operators to the Observable to get a new tailored stream.
While an Observable can do everything a Promise can, the reverse is not true. For example, an Observable can emit multiple values over time. A Promise only resolves once.
If you want to run requests in order and not discard any of them then use concat()
or concatMap()
operators. These wait until the previous Observable completes and then continue with the next one.
function fakeRequest(value) {
console.log('start request:', value)
return new Promise((resolve) => {
setTimeout(() => resolve(value), 1000);
});
}
let subject = new Subject();
subject.concatMap(value => Observable.fromPromise(fakeRequest(value)))
.subscribe(value => console.log(value));
subject.next('example value 1');
subject.next('example value 2');
subject.next('example value 3');
subject.next('example value 4');
This prints to console:
start request: example value 1
example value 1
start request: example value 2
example value 2
start request: example value 3
example value 3
start request: example value 4
example value 4
See live demo: https://jsbin.com/xaluvi/4/edit?js,console
If you wanted to ignore values then debounce
, throttle
or audit
are all good choices.
Edit: In newer RxJS versions you don't even need to use fromPromise
(or from
) and just return the Promise in concatMap
.
You can create observable from a promise and use use delayWhen operator in order to wait until that observable emits (promise resolves).
import { timer } from 'rxjs';
import { delayWhen, from } from 'rxjs/operators';
const myPromise = new Promise(resolve => setTimeout(resolve, 3000)); // will resolve in 3 sec
const waitable = timer(0, 1000).pipe(delayWhen(() => from(myPromise)));
waitable.subscribe(x => console.log(x));
Each emit should be delayed by 3 seconds.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With