So I'm experimenting with ngrx & ngrx/effects by building the following sandbox:
https://stackblitz.com/edit/ngrx-vanilla
Quick intro:
Three pages:
Question & context:
Upon receiving an update (illustrated in app/store/effects/meeting.effects.ts), a new action is dispatched.
Finally, the question: Is it a clean practice to have a a common service know about the store? Where is the best place to register a listener to a websocket / firebase realtime db in order to dispatch actions upon data being pushed?
Here, I made it so that an effect (meeting.effects) reacts upon the meetingActions.START_MEETING action type and whenever data is pushed, dispatch an update order to the store, but this feels wrong for a series of reasons I come up with:
How are such cases usually handled?
Assuming websocket
is emitting different kinds of events, map each event to different actions in a websocket
service like
@Injectable()
class WebsocketService{
private socket$:Observable<Action>
getActions():Observable<Action>{
if(!this.socket$) {
this.socket$ = Observable.webSocket(url).pipe(
map(functionToMapActions)
,shareReplay(1)
);
}
return this.socket$;
}
}
where functionToMapActions
maps webSocket events to actions, and I suggest adding shareReplay
operator at the end so that we read webSocket only once.
the Observable.webSocket
connects to webSocket, and emits events as they arrive
webSocket connection will be established when you subscribe to webService.getActions()
You can subscribe to websocket actions in @Effects
initialization see here
@Effect()
init$ = this.websocketService.getActions();
this will emit all actions as soon as your app starts (if the effect is in the root module) or your module is loaded if it is in the lazy loaded module;
Or if you are interested in a limited set of actions you can do like this
@Effect()
init$ = this.websocketService.getActions().pipe(filter(filterNeededActionsHere));
you can also start listening to actions only after a particular event like this
@Effect()
init$ = this.actions$.pipe(
ofType('Event which marks start')
,swichMapTo(this.websocketService.getActions())
);
like the previous example, you can also filter out the action here sane as before
hope this answers your question
NgRx v9 changed syntax a little. Use following code for root-level effects class:
init$ = createEffect(() =>
this.actions$.pipe(
ofType(ROOT_EFFECTS_INIT),
// websocket events handling logic
switchMap(() => webSocketEvents$)
)
);
This is just slightly adapted example from the Docs.
If you work with feature-level effect class, ROOT_EFFECTS_INIT
won't work, you need to use OnRunEffects
lifecycle hook:
class UserEffects implements OnRunEffects {
ngrxOnRunEffects(resolvedEffects$: Observable<EffectNotification>) {
// websocket events handling logic
return webSocketEvents$.pipe(
exhaustMap(() => resolvedEffects$)
);
}
}
More detailed example is in the Docs.
webSocketEvents$
in both examples is an Observable
that can be constructed using one of the rxjs functions:
webSocket(socketUrl) // if you want to manage websocket connection via rxjs
fromEvent(socketIoClient) // useful if websocket connection handled by socket.io-client
A custom example of WebSocket emitting a different kind of events for an entity Vehicle using adapter
1.Create a Vehicle-Socket-Adapter.ts
import { fromEvent, merge, Observable } from 'rxjs';
import { map } from 'rxjs/operators';
import { SocketIoService } from 'src/app/shared/services/socket-io.service';
export class VehicleSocketAdapter {
public onVehicleEngineStateChange: Observable<boolean>;
constructor(vehicleId: string, private socketIoService: SocketIoService) {
//using merge from rxjs/operators to concat multiple ws events to one Observable
this.onVehicleEngineStateChange = merge(
fromEvent(this.socketIoService.socket, `event.vehicle.${vehicleId}.start`).pipe(
map(vehicleStatus => true)
),
fromEvent(this.socketIoService.socket,`event.vehicle.${vehicleId}.stop`).pipe(
map(vehicleStatus => false)
),
)
}
}
2.Later import the adapter wherever you would like to use, an example app.component.ts
private subscriptions = new Subscription();
private listenForVehiclesState(vehicles) {
vehicles.forEach((vehicle) => {
const vehicleAdapter = new VehicleSocketAdapter(vehicle.id, this.webSocket);
this.subscriptions.add(
vehicleAdapter.onVehicleEngineStateChange.subscribe(vehicleStatus => {
// dispatch action when adapter commands you
this.store.dispatch(
VehiclesActions.BeginUpdateVehiclesStatusAction({
payload: {
vehicleId: vehicle.id,
status: vehicleStatus
}
})
);
}));
});
}
this.subscriptions.unsubscribe();
Bonus: Socket as service snippet :)
import { Injectable } from '@angular/core';
import * as io from 'socket.io-client';
@Injectable({
providedIn: 'root'
})
export class SocketIoService {
public socket: SocketIOClient.Socket = io('/', { path: '/api/livedata', transportOptions: { polling: extraHeaders: {'AuthToken': 'ifAny'} }});
constructor() { }
}
Happy Coding :)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With