Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJs Observable with WebSocket

My angular application uses a websocket to communicate with the backend.

In my test case I have 2 client components. The Observable timer prints two different client id's as expected.

Each ngOnInit() also prints the id of its client.

NOW for some reason, the subscription of websocketService.observeClient() is called 2 times for each message but this.client.id always prints the value of the second client.

Heres my client component

@Component({
...
})
export class ClientComponent implements OnInit {

  @Input() client: Client;

  constructor(public websocketService: WebsocketService) {
    Observable.timer(1000, 1000).subscribe(() => console.log(this.client.id));
  }

  ngOnInit() {

    console.log(this.client.id);
    this.websocketService.observeClient().subscribe(data => {
      console.log('message', this.client.id);
    });

  }

}

And my websocket Service

@Injectable()
export class WebsocketService {

  private observable: Observable<MessageEvent>;
  private observer: Subject<Message>;

  constructor() {

    const socket = new WebSocket('ws://localhost:9091');

    this.observable = Observable.create(
      (observer: Observer<MessageEvent>) => {
        socket.onmessage = observer.next.bind(observer);
        socket.onerror = observer.error.bind(observer);
        socket.onclose = observer.complete.bind(observer);
        return socket.close.bind(socket);
      }
    );

    this.observer = Subject.create({
      next: (data: Message) => {
        if (socket.readyState === WebSocket.OPEN) {
          socket.send(JSON.stringify(data));
        }
      }
    });

  }

  observeClient(): Observable<MessageEvent> {
    return this.observable;
  }

}

Edit

Ok as far as I have read it has to do with the fact that Observables are unicast objects and I have to use a Subject for that but I don't know how to create the Subject.

like image 443
Pascal Avatar asked Apr 10 '17 16:04

Pascal


People also ask

How do I use RXJS WebSockets?

You can use wss for secure websocket connection. import { webSocket } from "rxjs/"webSocket; const subject = webSocket("ws://localhost:8081"); This way you have a ready to use subject that you should subscribe to in order to establish the connection with your endpoint and start receiving and sending some data.

How does angular connect to WebSockets?

To make a WebSocket connection it is necessary to use the HTTP protocol upgrade mechanism, this allows us to move from an HTTP 1.1 connection to HTTP 2.0 (which allows using server push feature), or as in this case to move from an HTTP 1.1 connection to a WebSocket connection.

What is stomp over WebSocket?

The WebSocket API enables web applications to handle bidirectional communications whereas STOMP is a simple text-orientated messaging protocol. The STOMP protocol is commonly used inside a web socket when a web app needs to support bidirectional communication with a web server.

What is the difference between WebSocket and socket IO?

Key Differences between WebSocket and socket.io It provides the Connection over TCP, while Socket.io is a library to abstract the WebSocket connections. WebSocket doesn't have fallback options, while Socket.io supports fallback. WebSocket is the technology, while Socket.io is a library for WebSockets.


2 Answers

As of rxjs 5 you can use the built-in websocket feature which creates the subject for you. It also reconnects when you resubscribe to the stream after an error. Please refer to this answer:

https://stackoverflow.com/a/44067972/552203

TLDR:

 let subject = Observable.webSocket('ws://localhost:8081');
 subject
   .retry()
   .subscribe(
      (msg) => console.log('message received: ' + msg),
      (err) => console.log(err),
      () => console.log('complete')
    );
 subject.next(JSON.stringify({ op: 'hello' }));
like image 156
Herman Avatar answered Sep 20 '22 10:09

Herman


Well, I have been suffering with the websockets with angular and python for long. I had some websockets for each component and they didn't close, so every time I pass through one tab to another (changing betweeen components), the websocket was created again in the back and I was receiving twice or more times the same (solution at the end)

There are many tutorials explaining how to do websockets, but not many explaining how to close or how to have several of them.

Here my piece of bread: Best tuto I have found some far: https://medium.com/@lwojciechowski/websockets-with-angular2-and-rxjs-8b6c5be02fac But yet not complete. It does not close properly the WS

Here what I did: (My class WebSocketTestService)

@Injectable()
export class WebSocketTestService {
    public messages: Subject<any>  = new Subject<any>();

    constructor(private wsService: WebSocketService) {
        console.log('constructyor ws synop service')
    }

    public connect() {
        this.messages = <Subject<any>>this.wsService
            .connect('ws://localhost:8000/mytestwsid')
            .map((response: any): any => {
                return JSON.parse(response.data);
            })
    }

    public close() {
        this.wsService.close()
    }
} // end class 

And here (in another file for me) the websocket class

import {Injectable} from '@angular/core';
import {Observable} from 'rxjs/Observable';
import {Subject} from 'rxjs/Subject';
import {Observer} from "rxjs/Observer";

@Injectable()
export class WebSocketService {
    private subject: Subject<MessageEvent>;
    private subjectData: Subject<number>;
  private ws: any;
    // For chat box
    public connect(url: string): Subject<MessageEvent> {
        if (!this.subject) {
            this.subject = this.create(url);
        }
        return this.subject;
    }

    private create(url: string): Subject<MessageEvent> {
        this.ws = new WebSocket(url);

        let observable = Observable.create(
            (obs: Observer<MessageEvent>) => {
                this.ws.onmessage = obs.next.bind(obs);
                this.ws.onerror   = obs.error.bind(obs);
                this.ws.onclose   = obs.complete.bind(obs);

                return this.ws.close.bind(this.ws);
            });

        let observer = {
            next: (data: Object) => {
                if (this.ws.readyState === WebSocket.OPEN) {
                    this.ws.send(JSON.stringify(data));
                }
            }
        };

        return Subject.create(observer, observable);
  }
  
  public close() {
    console.log('on closing WS');
    this.ws.close()
    this.subject = null
  }

} // end class WebSocketService

And finally, my call to the WSTestService in the component

this.webSocketTestService.connect();
this.webSocketTestService.messages.subscribe(message => {
      console.log(message);

})

and in the most important part, the ngOnDestroy()

ngOnDestroy() {
    this.webSocketTestService.messages.unsubscribe();
    this.webSocketTestService.close()

Of course, add both services to the providers section in the app.module

It is the only way I have found to close properly the websocket when I change between views and I destroy components. I'm not really sure if it is your case but it is being hard for me to find an example who works properly with multiple views and websockets. I had similar problems to the ones you asked, so I hope it works for you. Let me know if my approach works for you.

Edit!: Actually, after some time I found that my problem was more related to the services cycle of life rather than the websockets. A ws service injected in root is shared by all the components so if it was reused to create a new websocket, I was overriding the first one as it was the same service. The best way to keep a webscoket for a determinated view is to inject the WS-service in the component that uses

providers: [Your websocket service]

Let me know if you have questions, I'll try to answer them.

like image 24
Javier Avatar answered Sep 20 '22 10:09

Javier