How to use SignalR with Angular 2?
How to manually run change detection when receiving data from SignalR?
I recently wrote an article that demonstrates one way to integrate Angular 2 and SignalR using a "channel/event" model:
https://blog.sstorie.com/integrating-angular-2-and-signalr-part-2-of-2/
I don't think just linking to another site is considered appropriate, so here's the core of the Angular 2 service that exposes SignalR:
import {Injectable, Inject} from "angular2/core";
import Rx from "rxjs/Rx";
/**
* When SignalR runs it will add functions to the global $ variable
* that you use to create connections to the hub. However, in this
* class we won't want to depend on any global variables, so this
* class provides an abstraction away from using $ directly in here.
*/
export class SignalrWindow extends Window {
$: any;
}
export enum ConnectionState {
Connecting = 1,
Connected = 2,
Reconnecting = 3,
Disconnected = 4
}
export class ChannelConfig {
url: string;
hubName: string;
channel: string;
}
export class ChannelEvent {
Name: string;
ChannelName: string;
Timestamp: Date;
Data: any;
Json: string;
constructor() {
this.Timestamp = new Date();
}
}
class ChannelSubject {
channel: string;
subject: Rx.Subject<ChannelEvent>;
}
/**
* ChannelService is a wrapper around the functionality that SignalR
* provides to expose the ideas of channels and events. With this service
* you can subscribe to specific channels (or groups in signalr speak) and
* use observables to react to specific events sent out on those channels.
*/
@Injectable()
export class ChannelService {
/**
* starting$ is an observable available to know if the signalr
* connection is ready or not. On a successful connection this
* stream will emit a value.
*/
starting$: Rx.Observable<any>;
/**
* connectionState$ provides the current state of the underlying
* connection as an observable stream.
*/
connectionState$: Rx.Observable<ConnectionState>;
/**
* error$ provides a stream of any error messages that occur on the
* SignalR connection
*/
error$: Rx.Observable<string>;
// These are used to feed the public observables
//
private connectionStateSubject = new Rx.Subject<ConnectionState>();
private startingSubject = new Rx.Subject<any>();
private errorSubject = new Rx.Subject<any>();
// These are used to track the internal SignalR state
//
private hubConnection: any;
private hubProxy: any;
// An internal array to track what channel subscriptions exist
//
private subjects = new Array<ChannelSubject>();
constructor(
@Inject(SignalrWindow) private window: SignalrWindow,
@Inject("channel.config") private channelConfig: ChannelConfig
) {
if (this.window.$ === undefined || this.window.$.hubConnection === undefined) {
throw new Error("The variable '$' or the .hubConnection() function are not defined...please check the SignalR scripts have been loaded properly");
}
// Set up our observables
//
this.connectionState$ = this.connectionStateSubject.asObservable();
this.error$ = this.errorSubject.asObservable();
this.starting$ = this.startingSubject.asObservable();
this.hubConnection = this.window.$.hubConnection();
this.hubConnection.url = channelConfig.url;
this.hubProxy = this.hubConnection.createHubProxy(channelConfig.hubName);
// Define handlers for the connection state events
//
this.hubConnection.stateChanged((state: any) => {
let newState = ConnectionState.Connecting;
switch (state.newState) {
case this.window.$.signalR.connectionState.connecting:
newState = ConnectionState.Connecting;
break;
case this.window.$.signalR.connectionState.connected:
newState = ConnectionState.Connected;
break;
case this.window.$.signalR.connectionState.reconnecting:
newState = ConnectionState.Reconnecting;
break;
case this.window.$.signalR.connectionState.disconnected:
newState = ConnectionState.Disconnected;
break;
}
// Push the new state on our subject
//
this.connectionStateSubject.next(newState);
});
// Define handlers for any errors
//
this.hubConnection.error((error: any) => {
// Push the error on our subject
//
this.errorSubject.next(error);
});
this.hubProxy.on("onEvent", (channel: string, ev: ChannelEvent) => {
//console.log(`onEvent - ${channel} channel`, ev);
// This method acts like a broker for incoming messages. We
// check the interal array of subjects to see if one exists
// for the channel this came in on, and then emit the event
// on it. Otherwise we ignore the message.
//
let channelSub = this.subjects.find((x: ChannelSubject) => {
return x.channel === channel;
}) as ChannelSubject;
// If we found a subject then emit the event on it
//
if (channelSub !== undefined) {
return channelSub.subject.next(ev);
}
});
}
/**
* Start the SignalR connection. The starting$ stream will emit an
* event if the connection is established, otherwise it will emit an
* error.
*/
start(): void {
// Now we only want the connection started once, so we have a special
// starting$ observable that clients can subscribe to know know if
// if the startup sequence is done.
//
// If we just mapped the start() promise to an observable, then any time
// a client subscried to it the start sequence would be triggered
// again since it's a cold observable.
//
this.hubConnection.start()
.done(() => {
this.startingSubject.next();
})
.fail((error: any) => {
this.startingSubject.error(error);
});
}
/**
* Get an observable that will contain the data associated with a specific
* channel
* */
sub(channel: string): Rx.Observable<ChannelEvent> {
// Try to find an observable that we already created for the requested
// channel
//
let channelSub = this.subjects.find((x: ChannelSubject) => {
return x.channel === channel;
}) as ChannelSubject;
// If we already have one for this event, then just return it
//
if (channelSub !== undefined) {
console.log(`Found existing observable for ${channel} channel`)
return channelSub.subject.asObservable();
}
//
// If we're here then we don't already have the observable to provide the
// caller, so we need to call the server method to join the channel
// and then create an observable that the caller can use to received
// messages.
//
// Now we just create our internal object so we can track this subject
// in case someone else wants it too
//
channelSub = new ChannelSubject();
channelSub.channel = channel;
channelSub.subject = new Rx.Subject<ChannelEvent>();
this.subjects.push(channelSub);
// Now SignalR is asynchronous, so we need to ensure the connection is
// established before we call any server methods. So we'll subscribe to
// the starting$ stream since that won't emit a value until the connection
// is ready
//
this.starting$.subscribe(() => {
this.hubProxy.invoke("Subscribe", channel)
.done(() => {
console.log(`Successfully subscribed to ${channel} channel`);
})
.fail((error: any) => {
channelSub.subject.error(error);
});
},
(error: any) => {
channelSub.subject.error(error);
});
return channelSub.subject.asObservable();
}
// Not quite sure how to handle this (if at all) since there could be
// more than 1 caller subscribed to an observable we created
//
// unsubscribe(channel: string): Rx.Observable<any> {
// this.observables = this.observables.filter((x: ChannelObservable) => {
// return x.channel === channel;
// });
// }
/** publish provides a way for calles to emit events on any channel. In a
* production app the server would ensure that only authorized clients can
* actually emit the message, but here we're not concerned about that.
*/
publish(ev: ChannelEvent): void {
this.hubProxy.invoke("Publish", ev);
}
}
Then a component could use this service by subscribing (not in the rxjs sense...) to a specific channel, and reacting to specific events emitted:
import {Component, OnInit, Input} from "angular2/core";
import {Http, Response} from "angular2/http";
import Rx from "rxjs/Rx";
import {ChannelService, ChannelEvent} from "./services/channel.service";
class StatusEvent {
State: string;
PercentComplete: number;
}
@Component({
selector: 'task',
template: `
<div>
<h4>Task component bound to '{{eventName}}'</h4>
</div>
<div class="commands">
<textarea
class="console"
cols="50"
rows="15"
disabled
[value]="messages"></textarea>
<div class="commands__input">
<button (click)="callApi()">Call API</button>
</div>
</div>
`
})
export class TaskComponent implements OnInit {
@Input() eventName: string;
@Input() apiUrl: string;
messages = "";
private channel = "tasks";
constructor(
private http: Http,
private channelService: ChannelService
) {
}
ngOnInit() {
// Get an observable for events emitted on this channel
//
this.channelService.sub(this.channel).subscribe(
(x: ChannelEvent) => {
switch (x.Name) {
case this.eventName: { this.appendStatusUpdate(x); }
}
},
(error: any) => {
console.warn("Attempt to join channel failed!", error);
}
)
}
private appendStatusUpdate(ev: ChannelEvent): void {
// Just prepend this to the messages string shown in the textarea
//
let date = new Date();
switch (ev.Data.State) {
case "starting": {
this.messages = `${date.toLocaleTimeString()} : starting\n` + this.messages;
break;
}
case "complete": {
this.messages = `${date.toLocaleTimeString()} : complete\n` + this.messages;
break;
}
default: {
this.messages = `${date.toLocaleTimeString()} : ${ev.Data.State} : ${ev.Data.PercentComplete} % complete\n` + this.messages;
}
}
}
callApi() {
this.http.get(this.apiUrl)
.map((res: Response) => res.json())
.subscribe((message: string) => { console.log(message); });
}
}
I tried to map the SignalR concepts into observables, but I'm still learning how to effectively use RxJS. In any case I hope that helps show how this might work in the context of an Angular 2 app.
You can also try using ng2-signalr.
npm install ng2-signalr --save
Here is the link to the source.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With