Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Property 'connect' does not exist on type 'Observable<any>' | RXJS multicast

I have an Observable that produce the unicast value(individually for all observers). But when i am going to convert into multicast using RxJs multicast operators then it return the following error.

Property 'connect' does not exist on type 'Observable'

Unicast (Working Code) -

let source4$ = interval(1000).pipe(take(4));

source4$.subscribe(val => {
    console.log(`Observer 1: ${val}`);
});

setTimeout(function() {
    source4$.subscribe(val => {
        console.log(`Observer 2: ${val}`);
    }); 
}, 1000);
setTimeout(function() {
    source4$.subscribe(val => {
        console.log(`Observer 3: ${val}`);
    }); 
}, 2000);

Multicast(Not Working Code) -

let source4$ = interval(1000).pipe(take(4), multicast(new Subject()));

source4$.subscribe(val => {
    console.log(`Observer 1: ${val}`);
});

setTimeout(function() {
    source4$.subscribe(val => {
        console.log(`Observer 2: ${val}`);
    }); 
}, 1000);
setTimeout(function() {
    source4$.subscribe(val => {
        console.log(`Observer 3: ${val}`);
    }); 
}, 2000);

source4$.connect();
like image 939
Sachin Kumar Avatar asked Jan 19 '19 07:01

Sachin Kumar


2 Answers

You're actually correct here. The multicast operator really returns an instance of ConnectableObservable (https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/multicast.ts#L54).

This is just an issue with TypeScript types where pipe() always returns just Observable:(https://github.com/ReactiveX/rxjs/blob/master/src/internal/Observable.ts#L301-L311).

This has been reported and there's an opened issue in RxJS's GitHub page: https://github.com/ReactiveX/rxjs/issues/2972

The easiest workaround is to force override the returned Observable:

const source4$ = interval(1000).pipe(...) as ConnectableObservable<number>
like image 158
martin Avatar answered Oct 08 '22 10:10

martin


You could go around it by not using the pipe() function to connect your operator, as all it's doing is calling the function returned by multicast().

For example: observable.pipe(take(5)) is the same as take(5)(observable)

The only problem this way is, TypeScript will not be able to infer the type of the Observable. So you will have to specify it when you create your Subject.

let source4$ = multicast(new Subject<number>())(interval(1000).pipe(take(4)));

source4$.subscribe(val => {
    console.log(`Observer 1: ${val}`);
});

setTimeout(function() {
    source4$.subscribe(val => {
        console.log(`Observer 2: ${val}`);
    }); 
}, 1000);
setTimeout(function() {
    source4$.subscribe(val => {
        console.log(`Observer 3: ${val}`);
    }); 
}, 2000);

source4$.connect();
like image 23
Grabofus Avatar answered Oct 08 '22 10:10

Grabofus