With promises, it's really easy to implement a queue to prevent for example multiple HTTP requests from running in parallel:
class Runner {
private promise;
constructor(http) {
this.promise = q.resolve();
}
getUrl() {
return this.promise = this.promise.then(() => http.get('http://someurl'))
}
}
var runner = new Runner(http);
var lastPromise;
for (var i = 0; i < 10; i++) {
lastPromise = runner.getUrl();
}
lastPromise.then(() => console.log("job's done!");
I can't figure out how to do this in Rxjs tho. If I try something similar to the above, all previous HTTP calls get repeated when I add a request because it just adds to the stream and reruns the whole thing.
I read something about a queue scheduler, but that doesn't seem to exist (anymore)?
You can use concat like @cartant suggested:
const urlQueue = Observable.fromPromise(http.get('http://someurl'))
.concat(Observable.fromPromise(http.get('http://someurl')))
.concat(Observable.fromPromise(http.get('http://someurl')));
But you would need to construct such a stream before subscribing and letting the queue handle it. Also; fromPromise
is still eager so your promises will all start running directly when you invoke above code. To solve this you would need to use Defer()
:
const urls = [
'http://someurl',
'http://someurl',
'http://someurl',
];
const queue = urls
.map(url => Observable.defer(() => http.get(url))
.reduce((acc, curr) => acc.concat(curr));
This approach uses the native array map
to convert the urls to Observables and then uses reduce
to concat them all together into one big stream.
A better solution would be to get your url's into a stream and then use mergeMap
with a concurrency appended to it:
const urls = [
'http://someurl',
'http://someurl',
'http://someurl',
];
const queuedGets = Observable.from(urls)
.mergeMap(url => http.get(url), null, 1);
This will result in the urls being retrieved one by one after the previous one has completed but you still need to have all urls ready before starting. Depending on your usecase this might suffice. Note that a mergeMap
with concurrency set to 1
is the equivalent of just using concatMap
The last part of the puzzle maybe is that you need to push new urls into your queue in your own tempo. To do so you would need a Subject
A Subject is like an Observable, but can multicast to many Observers. Subjects are like EventEmitters: they maintain a registry of many listeners.
class HttpGetQueue {
const queue = new Subject();
constructor() {
public results = queue
.mergeMap(url => http.get(url), null, 1);
}
addToQueue(url) {
queue.next(url);
}
}
const instance = new HttpGetQueue();
instance.results.subscribe(res => console.log('got res: ' + res);
instance.addToQueue('http://someurl');
instance.addToQueue('http://someurl');
instance.addToQueue('http://someurl');
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With