Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Angular 8 - handling SSE reconnect on error

I'm working on an Angular 8 (with Electron 6 and Ionic 4) project and right now we are having evaluation phase where we are deciding whether to replace polling with SSE (Server-sent events) or Web Sockets. My part of the job is to research SSE.

I created small express application which generates random numbers and it all works fine. The only thing that bugs me is correct way to reconnect on server error.

My implementation looks like this:

  private createSseSource(): Observable<MessageEvent> {
    return Observable.create(observer => {
      this.eventSource = new EventSource(SSE_URL);
      this.eventSource.onmessage = (event) => {
        this.zone.run(() => observer.next(event));
      };

    this.eventSource.onopen = (event) => {
      console.log('connection open');
    };

    this.eventSource.onerror = (error) => {
      console.log('looks like the best thing to do is to do nothing');
      // this.zone.run(() => observer.error(error));
      // this.closeSseConnection();
      // this.reconnectOnError();
    };
  });
}

I tried to implement reconnectOnError() function following this answer, but I just wasn't able to make it work. Then I ditched the reconnectOnError() function and it seems like it's a better thing to do. Do not try to close and reconnect nor propagate error to observable. Just sit and wait and when the server is running again it will reconnect automatically.

Question is, is this really the best thing to do? Important thing to mention is, that the FE application communicates with it's own server which can't be accessed by another instance of the app (built-in device).

like image 785
mat.hudak Avatar asked Sep 24 '19 07:09

mat.hudak


1 Answers

I see that my question is getting some attention so I decided to post my solution. To answer my question: "Is this really the best thing to do, to omit reconnect function?" I don't know :). But this solution works for me and it was proven in production, that it offers way how to actually control SSE reconnect to some extent.

Here's what I did:

  1. Rewritten createSseSource function so the return type is void
  2. Instead of returning observable, data from SSE are fed to subjects/NgRx actions
  3. Added public openSseChannel and private reconnectOnError functions for better control
  4. Added private function processSseEvent to handle custom message types

Since I'm using NgRx on this project every SSE message dispatches corresponding action, but this can be replaced by ReplaySubject and exposed as observable.

// Public function, initializes connection, returns true if successful
openSseChannel(): boolean {
  this.createSseEventSource();
  return !!this.eventSource;
}

// Creates SSE event source, handles SSE events
protected createSseEventSource(): void {
  // Close event source if current instance of SSE service has some
  if (this.eventSource) {
    this.closeSseConnection();
    this.eventSource = null;
  }
  // Open new channel, create new EventSource
  this.eventSource = new EventSource(this.sseChannelUrl);

  // Process default event
  this.eventSource.onmessage = (event: MessageEvent) => {
    this.zone.run(() => this.processSseEvent(event));
  };

  // Add custom events
  Object.keys(SSE_EVENTS).forEach(key => {
    this.eventSource.addEventListener(SSE_EVENTS[key], event => {
      this.zone.run(() => this.processSseEvent(event));
    });
  });

  // Process connection opened
  this.eventSource.onopen = () => {
    this.reconnectFrequencySec = 1;
  };

  // Process error
  this.eventSource.onerror = (error: any) => {
    this.reconnectOnError();
  };
}

// Processes custom event types
private processSseEvent(sseEvent: MessageEvent): void {
  const parsed = sseEvent.data ? JSON.parse(sseEvent.data) : {};
  switch (sseEvent.type) {
    case SSE_EVENTS.STATUS: {
      this.store.dispatch(StatusActions.setStatus({ status: parsed }));
      // or
      // this.someReplaySubject.next(parsed);
      break;
    }
    // Add others if neccessary
    default: {
      console.error('Unknown event:', sseEvent.type);
      break;
    }
  }
}

// Handles reconnect attempts when the connection fails for some reason.
// const SSE_RECONNECT_UPPER_LIMIT = 64;
private reconnectOnError(): void {
  const self = this;
  this.closeSseConnection();
  clearTimeout(this.reconnectTimeout);
  this.reconnectTimeout = setTimeout(() => {
    self.openSseChannel();
    self.reconnectFrequencySec *= 2;
    if (self.reconnectFrequencySec >= SSE_RECONNECT_UPPER_LIMIT) {
      self.reconnectFrequencySec = SSE_RECONNECT_UPPER_LIMIT;
    }
  }, this.reconnectFrequencySec * 1000);
}

Since the SSE events are fed to subject/actions it doesn't matter if the connection is lost since at least last event is preserved within subject or store. Attempts to reconnect can then happen silently and when new data are send, there are processed seamlessly.

like image 62
mat.hudak Avatar answered Oct 29 '22 09:10

mat.hudak