I am learning Rx.js and have one problem with zip
operator:
var error =Rx.Observable.throw('Oop!');
var age$ = Rx.Observable.concat(Rx.Observable.of(21,22,23),error);
var sex$ = Rx.Observable.of("male","male","female","female");
var name$ = Rx.Observable.of("jack","john","james","lucy");
var example = Rx.Observable.zip(age$,sex$,name$,(age,sex,name)=>{ return {age,sex,name} });
and i subscribe the example
source and print some message:
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
the out put is not what i expected:
{age:21,sex:"male",name:"jack"}
{age:22,sex:"male",name:"john"}
{age:23,sex:"female",name:"james"}
error
but just one line with no value output
:
error
read the offical doc but no chapter explained when the zip
operator emit error
.
Can anyone help?thx very much.
RxJS implements this operator as zip and zipArray . zip accepts a variable number of Observables or Promises as parameters, followed by a function that accepts one item emitted by each of those Observables or resolved by those Promises as input and produces a single item to be emitted by the resulting Observable.
Below an example following the same idea as the previous code, but instead of Zip, we are now using CombineLatest.
There are mainly two types of RxJS operators: Static Operators: The static operators are generally used to create observables. These types of operators can be found mainly under the creation operators. Instance Operators: The instance operators are methods on observable instances.
You see the error straight away because the first observable that you pass emits its values synchronously. (The other observables emit their values synchronously, too, but that does not matter in this scenario.)
zip
subscribes to the passed observables one by one and in the order in which they are passed. Upon subscribing to the first observable, zip
synchronously receives all of the observable's values and the concatenated error. It then emits its own error and is done.
If you specify the optional scheduler argument - so that the observables emit their values asynchronously - you will see the behaviour you were expecting:
var age$ = Rx.Observable.concat(
Rx.Observable.of(21, 22, 23, Rx.Scheduler.async),
Rx.Observable.throw("Oop!", Rx.Scheduler.async)
);
var sex$ = Rx.Observable.of(
"male", "male", "female", "female",
Rx.Scheduler.async
);
var name$ = Rx.Observable.of(
"jack", "john", "james", "lucy",
Rx.Scheduler.async
);
var zipped$ = Rx.Observable.zip(
age$, sex$, name$,
(age, sex, name) => ({ age, sex, name })
);
zipped$.subscribe(
(value) => console.log(value),
(error) => console.log(error),
() => console.log("complete")
);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With