TL;DR - I'm Looking for a way to control the number of HTTP requests concurrent connections to a REST API while I use RxJS.
My Node.js app will make a few thousand REST API calls to a third party provider. However, I know that if I make all those requests at once, the service might go down or reject my requests because of DDoS attack. So, I want to set the max number of concurrent connection at any given time. I used to implement concurrency control with Promises by leveraging Throat Package, but I haven't found a similar way to implement this.
I tried to use merge
with 1 for concurrence as suggested in this post How to limit the concurrency of flatMap?, but all requests are sent at once.
Here's my code:
var Rx = require('rx'),
rp = require('request-promise');
var array = ['https://httpbin.org/ip', 'https://httpbin.org/user-agent',
'https://httpbin.org/delay/3',
'https://httpbin.org/delay/3',
'https://httpbin.org/delay/3'
];
var source = Rx.Observable.fromArray(array).map(httpGet).merge(1);
function httpGet(url) {
return rp.get(url);
}
var results = [];
var subscription = source.subscribe(
function (x) {
console.log('=====', x, '======');
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});
Conclusion. Although JavaScript is single-threaded, RxJS is follows the same principles as other libraries for reactive streams. We can create asynchronous streams, have some degree of concurrency and web workers even allow for parallelism.
RxJS is a library for composing asynchronous and event-based programs by using observable sequences. It provides one core type, the Observable, satellite types (Observer, Schedulers, Subjects) and operators inspired by Array#extras (map, filter, reduce, every, etc) to allow handling asynchronous events as collections.
take returns an Observable that emits only the first count values emitted by the source Observable. If the source emits fewer than count values then all of its values are emitted. After that, it completes, regardless if the source completes.
You can use the mergeMap
operator to perform the HTTP requests and to flatten the responses into the composed observable. mergeMap
takes an optional concurrent
parameter with which you can specify the maximum number of concurrently subscribed observables (i.e. HTTP requests):
let source = Rx.Observable
.fromArray(array)
.mergeMap(httpGet, 1);
Note that a mergeMap
with concurrent
specified as 1
is equivalent to concatMap
.
The reason the code in your question sends all of the requests at once is down to the calling of your httpGet
function in the map
operator. httpGet
returns a Promise and promises are not lazy - as soon as httpGet
is called, the request will be sent.
With the above code, the httpGet
will only be called in the mergeMap
implementation if there are fewer than the specified number of concurrent requests.
The code above will emit each response separately from the composed observable. If you want the responses combined into an array that is emitted when all requests have completed, you can use the toArray
operator:
let source = Rx.Observable
.fromArray(array)
.mergeMap(httpGet, 1)
.toArray();
You should also check out the recipes that Martin has referenced in his comment.
Rx.Observable.fromPromise
may be useful in your case. Expanding on cartant's answer, try this, where concurrent
is specified as 1
:
Rx.Observable.from(array)
.mergeMap(url => Rx.Observable.fromPromise(rp.get(url)), 1)
.subscribe(x => console.log(x))
For time based control, this is what I can think of:
Rx.Observable.from(array)
.bufferCount(2)
.zip(Rx.Observable.timer(0, 1000), x => x)
.mergeMap(x => Rx.Observable.from(x)
.mergeMap(url => Rx.Observable.fromPromise(rp.get(url)))
.subscribe(x => console.log(x))
Thanks for the responses above. My issue had to do with using rx instead of rxjs NPM module. After I uninstalled rx and installed rxjs all examples started to use concurrency as expected. So, http concurrent calls with Promises, Callbacks, and Native Observables worked fine.
I'm posting them here in case anyone run into similar issues and can troubleshoot.
HTTP Request Callback-Based Sample:
var Rx = require('rxjs'),
request = require('request'),
request_rx = Rx.Observable.bindCallback(request.get);
var array = [
'https://httpbin.org/ip',
'https://httpbin.org/user-agent',
'https://httpbin.org/delay/3',
'https://httpbin.org/delay/3',
'https://httpbin.org/delay/3'
];
var source = Rx.Observable.from(array).mergeMap(httpGet, 1);
function httpGet(url) {
return request_rx(url);
}
var subscription = source.subscribe(
function (x, body) {
console.log('=====', x[1].body, '======');
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});
Promised-Based Sample:
var Rx = require('rxjs'),
rp = require('request-promise');
var array = ['https://httpbin.org/ip', 'https://httpbin.org/user-agent',
'https://httpbin.org/delay/3',
'https://httpbin.org/delay/3',
'https://httpbin.org/delay/3'
];
var source = Rx.Observable.from(array).mergeMap(httpGet, 1);
function httpGet(url) {
return rp.get(url);
}
var results = [];
var subscription = source.subscribe(
function (x) {
console.log('=====', x, '======');
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});
Native RxJS Sample:
var Rx = require('rxjs'),
superagent = require('superagent'),
Observable = require('rxjs').Observable;
var array = [
'https://httpbin.org/ip',
'https://httpbin.org/user-agent',
'https://httpbin.org/delay/10',
'https://httpbin.org/delay/2',
'https://httpbin.org/delay/2',
'https://httpbin.org/delay/1',
];
let start = (new Date()).getTime();
var source = Rx.Observable.from(array)
.mergeMap(httpGet, null, 1)
.timestamp()
.map(stamp => [stamp.timestamp - start, stamp.value]);
function httpGet(apiUrl) {
return Observable.create((observer) => {
superagent
.get(apiUrl)
.end((err, res) => {
if (err) {
return observer.onError(err);
}
let data,
inspiration;
data = JSON.parse(res.text);
inspiration = data;
observer.next(inspiration);
observer.complete();
});
});
}
var subscription = source.subscribe(
function (x) {
console.log('=====', x, '======');
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With