Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RXJS control observable invocation

I use RxJs version 5 within my Angular 2 project. I want to create some observables but I don't want the observables being invoked immediately.

In version 4 you could control the invocation with (for example) the Controlled command or Pausable Buffers. But that functionality is not (yet) available in version 5.

How can I get the this kind of functionality in RxJs 5?

My ultimate goal is to queue the created observables and invoke them one by one. The next one is only invoked when the previous one is processed successfully. When one fails, the queue is emptied.

EDIT

With the the comment of @Niklas Fasching I could create a working solution with the Publish operation.

JS Bin

// Queue to queue operations
const queue = [];

// Just a function to create Observers
function createObserver(id): Observer {
    return {
        next: function (x) {
            console.log('Next: ' + id + x);
        },
        error: function (err) {
            console.log('Error: ' + err);
        },
        complete: function () {
            console.log('Completed');
        }
    };
};

// Creates an async operation and add it to the queue
function createOperation(name: string): Observable {

  console.log('add ' + name);
  // Create an async operation
  var observable = Rx.Observable.create(observer => {
    // Some async operation
    setTimeout(() => 
               observer.next(' Done'), 
               500);
  });
  // Hold the operation
  var published = observable.publish();
  // Add Global subscribe
  published.subscribe(createObserver('Global'));
  // Add it to the queue
  queue.push(published);
  // Return the published so the caller could add a subscribe
  return published;
};

// Create 4 operations on hold
createOperation('SourceA').subscribe(createObserver('SourceA'));
createOperation('SourceB').subscribe(createObserver('SourceB'));
createOperation('SourceC').subscribe(createObserver('SourceC'));
createOperation('SourceD').subscribe(createObserver('SourceD'));

// Dequeue and run the first
queue.shift().connect();
like image 798
Justin Avatar asked Feb 11 '16 15:02

Justin


Video Answer


2 Answers

With Rx4's controlled Observable is still invoked when you subscribe

The controlled operator in RxJS 4 was really just controlling the flow of the Observable after the operator. Up to that point, it all pumps through and buffers at that operator. Consider this:

(RxJS 4) http://jsbin.com/yaqabe/1/edit?html,js,console

const source = Rx.Observable.range(0, 5).do(x => console.log('do' + x)).controlled();

source.subscribe(x => console.log(x));

setTimeout(() => {
  console.log('requesting');
  source.request(2);
}, 1000);

You'll notice all five values from the Observable.range(0, 5) are emitted by the do immediately... then a one second (1000ms) pause before you get your two values.

So, it's really the illusion of backpressure control. In the end, there's an unbounded buffer in that operator. An array that is collecting everything that the Observable "above" it is sending down and waiting for you to dequeue it by calling request(n).


RxJS 5.0.0-beta.2 replicating controlled

At the time of this answer, the controlled operator does not exist in RxJS 5. This is for a few reasons: 1. No requests for it, and 2. Its name is clearly confusing (hence this question on StackOverflow)

How to replicate the behavior in RxJS 5 (for now): http://jsbin.com/metuyab/1/edit?html,js,console

// A subject we'll use to zip with the source
const controller = new Rx.Subject();

// A request function to next values into the subject
function request(count) {
  for (let i = 0; i < count; i++) {
    controller.next(count);
  }
}

// We'll zip our source with the subject, we don't care about what
// comes out of the Subject, so we'll drop that.
const source = Rx.Observable.range(0, 5).zip(controller, (x, _) => x);

// Same effect as above Rx 4 example
source.subscribe(x => console.log(x));

// Same effect as above Rx 4 example
request(3);

Backpressure control

For "real backpressure control" right now, one solution is an iterator of promise. IoP isn't without its problems though, for one thing, there's an object allocation at each turn. Every value has a Promise associated to it. For another thing, cancellation isn't there, because it's promises.

A better, Rx-based approach is to have a Subject that "feeds" the top of your observable chain, and you compose in the rest.

Something like this: http://jsbin.com/qeqaxo/2/edit?js,console

// start with 5 values
const controller = new Rx.BehaviorSubject(5);

// some observable source, in this case, an interval.
const source = Rx.Observable.interval(100)

const controlled = controller.flatMap(
      // map your count into a set of values
      (count) => source.take(count), 
      // additional mapping for metadata about when the block is done
      (count, value, _, index) => {
        return { value: value, done: count - index === 1 }; 
      })
      // when the block is done, request 5 more.
      .do(({done}) => done && controller.next(5))
      // we only care about the value for output
      .map(({value}) => value);


// start our subscription
controlled.subscribe(x => {
  console.log(x)
});

... we have some plans for a flowable observable type with real backpressure control in the near future, too. That will be more exciting and much better for this sort of scenario.

like image 179
Ben Lesh Avatar answered Sep 19 '22 08:09

Ben Lesh


You can seperate the start of the observable from subscription to it by publishing the observable. The published observable will only be started after calling connect on it.

Note that all subscribers will share a single subscription to the observable sequence.

var published = Observable.of(42).publish();
// subscription does not start the observable sequence
published.subscribe(value => console.log('received: ', value));
// connect starts the sequence; subscribers will now receive values
published.connect();
like image 36
Niklas Fasching Avatar answered Sep 18 '22 08:09

Niklas Fasching