I use RxJs version 5 within my Angular 2 project. I want to create some observables but I don't want the observables being invoked immediately.
In version 4 you could control the invocation with (for example) the Controlled command or Pausable Buffers. But that functionality is not (yet) available in version 5.
How can I get the this kind of functionality in RxJs 5?
My ultimate goal is to queue the created observables and invoke them one by one. The next one is only invoked when the previous one is processed successfully. When one fails, the queue is emptied.
EDIT
With the the comment of @Niklas Fasching I could create a working solution with the Publish operation.
JS Bin
// Queue to queue operations
const queue = [];
// Just a function to create Observers
function createObserver(id): Observer {
return {
next: function (x) {
console.log('Next: ' + id + x);
},
error: function (err) {
console.log('Error: ' + err);
},
complete: function () {
console.log('Completed');
}
};
};
// Creates an async operation and add it to the queue
function createOperation(name: string): Observable {
console.log('add ' + name);
// Create an async operation
var observable = Rx.Observable.create(observer => {
// Some async operation
setTimeout(() =>
observer.next(' Done'),
500);
});
// Hold the operation
var published = observable.publish();
// Add Global subscribe
published.subscribe(createObserver('Global'));
// Add it to the queue
queue.push(published);
// Return the published so the caller could add a subscribe
return published;
};
// Create 4 operations on hold
createOperation('SourceA').subscribe(createObserver('SourceA'));
createOperation('SourceB').subscribe(createObserver('SourceB'));
createOperation('SourceC').subscribe(createObserver('SourceC'));
createOperation('SourceD').subscribe(createObserver('SourceD'));
// Dequeue and run the first
queue.shift().connect();
controlled
Observable is still invoked when you subscribeThe controlled
operator in RxJS 4 was really just controlling the flow of the Observable after the operator. Up to that point, it all pumps through and buffers at that operator. Consider this:
(RxJS 4) http://jsbin.com/yaqabe/1/edit?html,js,console
const source = Rx.Observable.range(0, 5).do(x => console.log('do' + x)).controlled();
source.subscribe(x => console.log(x));
setTimeout(() => {
console.log('requesting');
source.request(2);
}, 1000);
You'll notice all five values from the Observable.range(0, 5)
are emitted by the do
immediately... then a one second (1000ms) pause before you get your two values.
So, it's really the illusion of backpressure control. In the end, there's an unbounded buffer in that operator. An array that is collecting everything that the Observable "above" it is sending down and waiting for you to dequeue it by calling request(n)
.
controlled
At the time of this answer, the controlled
operator does not exist in RxJS 5. This is for a few reasons: 1. No requests for it, and 2. Its name is clearly confusing (hence this question on StackOverflow)
How to replicate the behavior in RxJS 5 (for now): http://jsbin.com/metuyab/1/edit?html,js,console
// A subject we'll use to zip with the source
const controller = new Rx.Subject();
// A request function to next values into the subject
function request(count) {
for (let i = 0; i < count; i++) {
controller.next(count);
}
}
// We'll zip our source with the subject, we don't care about what
// comes out of the Subject, so we'll drop that.
const source = Rx.Observable.range(0, 5).zip(controller, (x, _) => x);
// Same effect as above Rx 4 example
source.subscribe(x => console.log(x));
// Same effect as above Rx 4 example
request(3);
For "real backpressure control" right now, one solution is an iterator of promise. IoP isn't without its problems though, for one thing, there's an object allocation at each turn. Every value has a Promise associated to it. For another thing, cancellation isn't there, because it's promises.
A better, Rx-based approach is to have a Subject that "feeds" the top of your observable chain, and you compose in the rest.
Something like this: http://jsbin.com/qeqaxo/2/edit?js,console
// start with 5 values
const controller = new Rx.BehaviorSubject(5);
// some observable source, in this case, an interval.
const source = Rx.Observable.interval(100)
const controlled = controller.flatMap(
// map your count into a set of values
(count) => source.take(count),
// additional mapping for metadata about when the block is done
(count, value, _, index) => {
return { value: value, done: count - index === 1 };
})
// when the block is done, request 5 more.
.do(({done}) => done && controller.next(5))
// we only care about the value for output
.map(({value}) => value);
// start our subscription
controlled.subscribe(x => {
console.log(x)
});
... we have some plans for a flowable observable type with real backpressure control in the near future, too. That will be more exciting and much better for this sort of scenario.
You can seperate the start of the observable from subscription to it by publishing the observable. The published observable will only be started after calling connect on it.
Note that all subscribers will share a single subscription to the observable sequence.
var published = Observable.of(42).publish();
// subscription does not start the observable sequence
published.subscribe(value => console.log('received: ', value));
// connect starts the sequence; subscribers will now receive values
published.connect();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With