I'm working on an Angular 8 (with Electron 6 and Ionic 4) project and right now we are having evaluation phase where we are deciding whether to replace polling with SSE (Server-sent events) or Web Sockets. My part of the job is to research SSE.
I created small express application which generates random numbers and it all works fine. The only thing that bugs me is correct way to reconnect on server error.
My implementation looks like this:
private createSseSource(): Observable<MessageEvent> {
return Observable.create(observer => {
this.eventSource = new EventSource(SSE_URL);
this.eventSource.onmessage = (event) => {
this.zone.run(() => observer.next(event));
};
this.eventSource.onopen = (event) => {
console.log('connection open');
};
this.eventSource.onerror = (error) => {
console.log('looks like the best thing to do is to do nothing');
// this.zone.run(() => observer.error(error));
// this.closeSseConnection();
// this.reconnectOnError();
};
});
}
I tried to implement reconnectOnError()
function following this answer, but I just wasn't able to make it work. Then I ditched the reconnectOnError()
function and it seems like it's a better thing to do. Do not try to close and reconnect nor propagate error to observable. Just sit and wait and when the server is running again it will reconnect automatically.
Question is, is this really the best thing to do? Important thing to mention is, that the FE application communicates with it's own server which can't be accessed by another instance of the app (built-in device).
I see that my question is getting some attention so I decided to post my solution. To answer my question: "Is this really the best thing to do, to omit reconnect function?" I don't know :). But this solution works for me and it was proven in production, that it offers way how to actually control SSE reconnect to some extent.
Here's what I did:
createSseSource
function so the return type is void
openSseChannel
and private reconnectOnError
functions for better controlprocessSseEvent
to handle custom message typesSince I'm using NgRx on this project every SSE message dispatches corresponding action, but this can be replaced by ReplaySubject
and exposed as observable
.
// Public function, initializes connection, returns true if successful
openSseChannel(): boolean {
this.createSseEventSource();
return !!this.eventSource;
}
// Creates SSE event source, handles SSE events
protected createSseEventSource(): void {
// Close event source if current instance of SSE service has some
if (this.eventSource) {
this.closeSseConnection();
this.eventSource = null;
}
// Open new channel, create new EventSource
this.eventSource = new EventSource(this.sseChannelUrl);
// Process default event
this.eventSource.onmessage = (event: MessageEvent) => {
this.zone.run(() => this.processSseEvent(event));
};
// Add custom events
Object.keys(SSE_EVENTS).forEach(key => {
this.eventSource.addEventListener(SSE_EVENTS[key], event => {
this.zone.run(() => this.processSseEvent(event));
});
});
// Process connection opened
this.eventSource.onopen = () => {
this.reconnectFrequencySec = 1;
};
// Process error
this.eventSource.onerror = (error: any) => {
this.reconnectOnError();
};
}
// Processes custom event types
private processSseEvent(sseEvent: MessageEvent): void {
const parsed = sseEvent.data ? JSON.parse(sseEvent.data) : {};
switch (sseEvent.type) {
case SSE_EVENTS.STATUS: {
this.store.dispatch(StatusActions.setStatus({ status: parsed }));
// or
// this.someReplaySubject.next(parsed);
break;
}
// Add others if neccessary
default: {
console.error('Unknown event:', sseEvent.type);
break;
}
}
}
// Handles reconnect attempts when the connection fails for some reason.
// const SSE_RECONNECT_UPPER_LIMIT = 64;
private reconnectOnError(): void {
const self = this;
this.closeSseConnection();
clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = setTimeout(() => {
self.openSseChannel();
self.reconnectFrequencySec *= 2;
if (self.reconnectFrequencySec >= SSE_RECONNECT_UPPER_LIMIT) {
self.reconnectFrequencySec = SSE_RECONNECT_UPPER_LIMIT;
}
}, this.reconnectFrequencySec * 1000);
}
Since the SSE events are fed to subject/actions it doesn't matter if the connection is lost since at least last event is preserved within subject or store. Attempts to reconnect can then happen silently and when new data are send, there are processed seamlessly.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With