I have an Observable that produce the unicast value(individually for all observers). But when i am going to convert into multicast using RxJs multicast operators then it return the following error.
Property 'connect' does not exist on type 'Observable'
Unicast (Working Code) -
let source4$ = interval(1000).pipe(take(4));
source4$.subscribe(val => {
console.log(`Observer 1: ${val}`);
});
setTimeout(function() {
source4$.subscribe(val => {
console.log(`Observer 2: ${val}`);
});
}, 1000);
setTimeout(function() {
source4$.subscribe(val => {
console.log(`Observer 3: ${val}`);
});
}, 2000);
Multicast(Not Working Code) -
let source4$ = interval(1000).pipe(take(4), multicast(new Subject()));
source4$.subscribe(val => {
console.log(`Observer 1: ${val}`);
});
setTimeout(function() {
source4$.subscribe(val => {
console.log(`Observer 2: ${val}`);
});
}, 1000);
setTimeout(function() {
source4$.subscribe(val => {
console.log(`Observer 3: ${val}`);
});
}, 2000);
source4$.connect();
You're actually correct here. The multicast
operator really returns an instance of ConnectableObservable
(https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/multicast.ts#L54).
This is just an issue with TypeScript types where pipe()
always returns just Observable
:(https://github.com/ReactiveX/rxjs/blob/master/src/internal/Observable.ts#L301-L311).
This has been reported and there's an opened issue in RxJS's GitHub page: https://github.com/ReactiveX/rxjs/issues/2972
The easiest workaround is to force override the returned Observable
:
const source4$ = interval(1000).pipe(...) as ConnectableObservable<number>
You could go around it by not using the pipe()
function to connect your operator,
as all it's doing is calling the function returned by multicast()
.
For example: observable.pipe(take(5))
is the same as take(5)(observable)
The only problem this way is, TypeScript will not be able to infer the type of the Observable. So you will have to specify it when you create your Subject.
let source4$ = multicast(new Subject<number>())(interval(1000).pipe(take(4)));
source4$.subscribe(val => {
console.log(`Observer 1: ${val}`);
});
setTimeout(function() {
source4$.subscribe(val => {
console.log(`Observer 2: ${val}`);
});
}, 1000);
setTimeout(function() {
source4$.subscribe(val => {
console.log(`Observer 3: ${val}`);
});
}, 2000);
source4$.connect();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With