Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJS equivalent of Async.js mapLimit

Tags:

Async.js mapLimit and its family of <name>Limit functions basically work like a semaphore: they allow a limited number of tasks to run concurrently while the extra incoming tasks are added to a queue. The queue becomes a (cold? connected?) producer. The task runner drains an item from the queue as soon as a spot becomes available (one of its tasks finishes).

This way a limited number of concurrent tasks are always active.

How can I achieve a similar functionality in RxJS?

like image 812
homam Avatar asked Mar 27 '16 19:03

homam


1 Answers

A combination of defer and flatMapWithMaxConcurrent is the RxJs way to do it:

// returns a promise
function runSomeJob(input) { ... }

function runSomeJobObservable(input) {
    return Rx.Observable.defer(function () {
        return runSomeJob(input);
    });
}

var inputStream = // some Rx.Observable

// only allow 5 jobs to run concurrently
var outputStream = inputStream
    .flatMapWithMaxConcurrent(5, runSomeJobObservable);

ouputStream.subscribe(...);
like image 106
Brandon Avatar answered Nov 15 '22 04:11

Brandon