Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there a way to manage concurrency with RxJS?

Tags:

node.js

rxjs

TL;DR - I'm Looking for a way to control the number of HTTP requests concurrent connections to a REST API while I use RxJS.

My Node.js app will make a few thousand REST API calls to a third party provider. However, I know that if I make all those requests at once, the service might go down or reject my requests because of DDoS attack. So, I want to set the max number of concurrent connection at any given time. I used to implement concurrency control with Promises by leveraging Throat Package, but I haven't found a similar way to implement this.

I tried to use merge with 1 for concurrence as suggested in this post How to limit the concurrency of flatMap?, but all requests are sent at once.

Here's my code:

var Rx = require('rx'),
  rp = require('request-promise');

var array = ['https://httpbin.org/ip', 'https://httpbin.org/user-agent',
  'https://httpbin.org/delay/3',
  'https://httpbin.org/delay/3',
  'https://httpbin.org/delay/3'
];

var source = Rx.Observable.fromArray(array).map(httpGet).merge(1);

function httpGet(url) {
  return rp.get(url);
}

var results = [];
var subscription = source.subscribe(
  function (x) {
    console.log('=====', x, '======');
  },
  function (err) {
    console.log('Error: ' + err);
  },
  function () {
    console.log('Completed');
  });
like image 241
Diego Avatar asked Mar 26 '17 09:03

Diego


People also ask

Is RxJS multithreaded?

Conclusion. Although JavaScript is single-threaded, RxJS is follows the same principles as other libraries for reactive streams. We can create asynchronous streams, have some degree of concurrency and web workers even allow for parallelism.

Is RxJS async?

RxJS is a library for composing asynchronous and event-based programs by using observable sequences. It provides one core type, the Observable, satellite types (Observer, Schedulers, Subjects) and operators inspired by Array#extras (map, filter, reduce, every, etc) to allow handling asynchronous events as collections.

What is take in RxJS?

take returns an Observable that emits only the first count values emitted by the source Observable. If the source emits fewer than count values then all of its values are emitted. After that, it completes, regardless if the source completes.


Video Answer


3 Answers

You can use the mergeMap operator to perform the HTTP requests and to flatten the responses into the composed observable. mergeMap takes an optional concurrent parameter with which you can specify the maximum number of concurrently subscribed observables (i.e. HTTP requests):

let source = Rx.Observable
  .fromArray(array)
  .mergeMap(httpGet, 1);

Note that a mergeMap with concurrent specified as 1 is equivalent to concatMap.

The reason the code in your question sends all of the requests at once is down to the calling of your httpGet function in the map operator. httpGet returns a Promise and promises are not lazy - as soon as httpGet is called, the request will be sent.

With the above code, the httpGet will only be called in the mergeMap implementation if there are fewer than the specified number of concurrent requests.

The code above will emit each response separately from the composed observable. If you want the responses combined into an array that is emitted when all requests have completed, you can use the toArray operator:

let source = Rx.Observable
  .fromArray(array)
  .mergeMap(httpGet, 1)
  .toArray();

You should also check out the recipes that Martin has referenced in his comment.

like image 97
cartant Avatar answered Oct 03 '22 09:10

cartant


Rx.Observable.fromPromise may be useful in your case. Expanding on cartant's answer, try this, where concurrent is specified as 1:

Rx.Observable.from(array)
  .mergeMap(url => Rx.Observable.fromPromise(rp.get(url)), 1)
  .subscribe(x => console.log(x))

For time based control, this is what I can think of:

Rx.Observable.from(array)
  .bufferCount(2)
  .zip(Rx.Observable.timer(0, 1000), x => x)
  .mergeMap(x => Rx.Observable.from(x)
    .mergeMap(url => Rx.Observable.fromPromise(rp.get(url)))
  .subscribe(x => console.log(x))
like image 28
user3587412 Avatar answered Oct 03 '22 09:10

user3587412


Thanks for the responses above. My issue had to do with using rx instead of rxjs NPM module. After I uninstalled rx and installed rxjs all examples started to use concurrency as expected. So, http concurrent calls with Promises, Callbacks, and Native Observables worked fine.

I'm posting them here in case anyone run into similar issues and can troubleshoot.

HTTP Request Callback-Based Sample:

var Rx = require('rxjs'),
  request = require('request'),
  request_rx = Rx.Observable.bindCallback(request.get);

var array = [
  'https://httpbin.org/ip', 
  'https://httpbin.org/user-agent',
  'https://httpbin.org/delay/3',
  'https://httpbin.org/delay/3',
  'https://httpbin.org/delay/3'
];

var source = Rx.Observable.from(array).mergeMap(httpGet, 1);

function httpGet(url) {
  return request_rx(url);
}

var subscription = source.subscribe(
  function (x, body) {
    console.log('=====', x[1].body, '======');
  },
  function (err) {
    console.log('Error: ' + err);
  },
  function () {
    console.log('Completed');
  });

Promised-Based Sample:

var Rx = require('rxjs'),
  rp = require('request-promise');

var array = ['https://httpbin.org/ip', 'https://httpbin.org/user-agent',
  'https://httpbin.org/delay/3',
  'https://httpbin.org/delay/3',
  'https://httpbin.org/delay/3'
];

var source = Rx.Observable.from(array).mergeMap(httpGet, 1);

function httpGet(url) {
  return rp.get(url);
}

var results = [];
var subscription = source.subscribe(
  function (x) {
    console.log('=====', x, '======');
  },
  function (err) {
    console.log('Error: ' + err);
  },
  function () {
    console.log('Completed');
  });

Native RxJS Sample:

var Rx = require('rxjs'),
  superagent = require('superagent'),
  Observable = require('rxjs').Observable;

var array = [
  'https://httpbin.org/ip', 
  'https://httpbin.org/user-agent',
  'https://httpbin.org/delay/10',
  'https://httpbin.org/delay/2',
  'https://httpbin.org/delay/2',
  'https://httpbin.org/delay/1',
];

let start = (new Date()).getTime();

var source = Rx.Observable.from(array)
    .mergeMap(httpGet, null, 1)
    .timestamp()
    .map(stamp => [stamp.timestamp - start, stamp.value]);

function httpGet(apiUrl) {
  return Observable.create((observer) => {
    superagent
        .get(apiUrl)
        .end((err, res) => {
            if (err) {
                return observer.onError(err);
            }
            let data,
                inspiration;
            data = JSON.parse(res.text);
            inspiration = data;
            observer.next(inspiration);
            observer.complete();
        });
    });
}

var subscription = source.subscribe(
  function (x) {
    console.log('=====', x, '======');
  });
like image 36
Diego Avatar answered Oct 03 '22 08:10

Diego