Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I implement a queue using Rxjs?

Tags:

rxjs

With promises, it's really easy to implement a queue to prevent for example multiple HTTP requests from running in parallel:

class Runner {
  private promise;
  constructor(http) {
    this.promise = q.resolve();
  }
  getUrl() {
    return this.promise = this.promise.then(() => http.get('http://someurl'))
  }
}

var runner = new Runner(http);

var lastPromise;
for (var i = 0; i < 10; i++) {
  lastPromise = runner.getUrl();
}

lastPromise.then(() => console.log("job's done!");

I can't figure out how to do this in Rxjs tho. If I try something similar to the above, all previous HTTP calls get repeated when I add a request because it just adds to the stream and reruns the whole thing.

I read something about a queue scheduler, but that doesn't seem to exist (anymore)?

like image 821
Bart van den Burg Avatar asked Oct 03 '17 06:10

Bart van den Burg


Video Answer


1 Answers

You can use concat like @cartant suggested:

const urlQueue = Observable.fromPromise(http.get('http://someurl'))
  .concat(Observable.fromPromise(http.get('http://someurl')))
  .concat(Observable.fromPromise(http.get('http://someurl')));

But you would need to construct such a stream before subscribing and letting the queue handle it. Also; fromPromise is still eager so your promises will all start running directly when you invoke above code. To solve this you would need to use Defer():

const urls = [
  'http://someurl',
  'http://someurl',
  'http://someurl',
];

const queue = urls
  .map(url => Observable.defer(() => http.get(url))
  .reduce((acc, curr) => acc.concat(curr));

This approach uses the native array map to convert the urls to Observables and then uses reduce to concat them all together into one big stream.

A better solution would be to get your url's into a stream and then use mergeMap with a concurrency appended to it:

const urls = [
  'http://someurl',
  'http://someurl',
  'http://someurl',
];

const queuedGets = Observable.from(urls)
  .mergeMap(url => http.get(url), null, 1);

This will result in the urls being retrieved one by one after the previous one has completed but you still need to have all urls ready before starting. Depending on your usecase this might suffice. Note that a mergeMap with concurrency set to 1 is the equivalent of just using concatMap

The last part of the puzzle maybe is that you need to push new urls into your queue in your own tempo. To do so you would need a Subject

A Subject is like an Observable, but can multicast to many Observers. Subjects are like EventEmitters: they maintain a registry of many listeners.

class HttpGetQueue {
  const queue = new Subject();

  constructor() {
    public results = queue
      .mergeMap(url => http.get(url), null, 1);
  }

  addToQueue(url) {
    queue.next(url);
  }
}

const instance = new HttpGetQueue();
instance.results.subscribe(res => console.log('got res: ' + res);
instance.addToQueue('http://someurl');
instance.addToQueue('http://someurl');
instance.addToQueue('http://someurl');
like image 141
Mark van Straten Avatar answered Jan 02 '23 09:01

Mark van Straten