Async.js mapLimit and its family of <name>Limit
functions basically work like a semaphore: they allow a limited number of tasks to run concurrently while the extra incoming tasks are added to a queue. The queue becomes a (cold? connected?) producer. The task runner drains an item from the queue as soon as a spot becomes available (one of its tasks finishes).
This way a limited number of concurrent tasks are always active.
How can I achieve a similar functionality in RxJS?
A combination of defer
and flatMapWithMaxConcurrent
is the RxJs way to do it:
// returns a promise
function runSomeJob(input) { ... }
function runSomeJobObservable(input) {
return Rx.Observable.defer(function () {
return runSomeJob(input);
});
}
var inputStream = // some Rx.Observable
// only allow 5 jobs to run concurrently
var outputStream = inputStream
.flatMapWithMaxConcurrent(5, runSomeJobObservable);
ouputStream.subscribe(...);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With