I am trying to display server-sent events emitted values in an angular 2 /RxJs app.
The backend regularly sends individual strings to the client through server-sent events.
I am not sure how to deal with the retrieved values on the angular 2/RxJs side.
Here is my client (a ng component):
import {Component, OnInit} from 'angular2/core';
import {Http, Response} from 'angular2/http';
import 'rxjs/Rx';
import {Observable} from 'rxjs/Observable';
@Component({
selector: 'my-app',
template: `<h1>My second Angular 2 App</h1>
<ul>
<li *ngFor="#s of someStrings | async">
a string: {{ s }}
</li>
</ul>
`
})
export class AppComponent implements OnInit {
constructor(private http:Http) {
}
errorMessage:string;
someStrings:string[];
ngOnInit() {
this.getSomeStrings()
.subscribe(
aString => this.someStrings.push(aString),
error => this.errorMessage = <any>error);
}
private getSomeStrings():Observable<string> {
return this.http.get('interval-sse-observable')
.map(this.extractData)
.catch(this.handleError);
}
private extractData(res:Response) {
if (res.status < 200 || res.status >= 300) {
throw new Error('Bad response status: ' + res.status);
}
let body = res.json();
return body || {};
}
private handleError(error:any) {
// In a real world app, we might send the error to remote logging infrastructure
let errMsg = error.message || 'Server error';
console.error(errMsg); // log to console instead
return Observable.throw(errMsg);
}
}
The backend method is as follows (and uses RxJava):
@ResponseStatus(HttpStatus.OK)
@RequestMapping(method = RequestMethod.GET, path = "interval-sse-observable")
public SseEmitter tickSseObservable() {
return RxResponse.sse(
Observable.interval(5, TimeUnit.SECONDS, Schedulers.io())
.map(tick -> randomUUID().toString())
);
}
I just noticed that the app hangs on the request and that nothing is displayed on the page.
I suspect there is an issue with my use of the map method i.e. .map(this.extractData)
.
I would just like to add the incoming strings to the array and display that array in the template which would update as the strings come in.
Can anyone please help?
edit: Here is a working solution (thanks to Thierry's answer below):
import {Component, OnInit} from 'angular2/core';
import 'rxjs/Rx';
@Component({
selector: 'my-app',
template: `<h1>My second Angular 2 App</h1>
<ul>
<li *ngFor="#s of someStrings">
a string: {{ s }}
</li>
</ul>
`
})
export class AppComponent implements OnInit {
someStrings:string[] = [];
ngOnInit() {
let source = new EventSource('/interval-sse-observable');
source.addEventListener('message', aString => this.someStrings.push(aString.data), false);
}
}
Here is a working example :
SseService
import {Injectable} from '@angular/core';
import {Observable} from 'rxjs/Observable';
declare var EventSource;
@Injectable()
export class SseService {
constructor() {
}
observeMessages(sseUrl: string): Observable<string> {
return new Observable<string>(obs => {
const es = new EventSource(sseUrl);
es.addEventListener('message', (evt) => {
console.log(evt.data);
obs.next(evt.data);
});
return () => es.close();
});
}
}
AppComponent
import {Component, OnDestroy, OnInit} from '@angular/core';
import {SseService} from './shared/services/sse/sse.service';
import {Observable, Subscription} from 'rxjs/Rx';
@Component({
selector: 'my-app',
template: `<h1>Angular Server-Sent Events</h1>
<ul>
<li *ngFor="let message of messages">
{{ message }}
</li>
</ul>
`
})
export class AppComponent implements OnInit, OnDestroy {
private sseStream: Subscription;
messages:Array<string> = [];
constructor(private sseService: SseService){
}
ngOnInit() {
this.sseStream = this.sseService.observeMessages('https://server.com/mysse')
.subscribe(message => {
messages.push(message);
});
}
ngOnDestroy() {
if (this.sseStream) {
this.sseStream.unsubscribe();
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With