Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Advice on this Angular + ngRx (effects) - what about websocket events?

So I'm experimenting with ngrx & ngrx/effects by building the following sandbox:

https://stackblitz.com/edit/ngrx-vanilla

Quick intro:

  • it has a root store in app/store
  • it has lazy loading of two modules in app/features
  • it has singleton services in app/commons

Three pages:

  • action items: routing to this pages triggers a random generation of three goofy corporate action items
  • users: a basic master > detail redux implementation with router support
  • meeting: the place that raises my question, click the "start meeting" to witness a relevant exchange of ideas.

Question & context:

  • I understand all data updates in redux are to happen via actions
  • the "effects" library is to handle async events in order to dispatch new actions based on 3rd party events and async calls.
  • the app/common/meeting/service imitates the behavior of for instance a websocket or a firebase realtime DB pushing updates.

Upon receiving an update (illustrated in app/store/effects/meeting.effects.ts), a new action is dispatched.

Finally, the question: Is it a clean practice to have a a common service know about the store? Where is the best place to register a listener to a websocket / firebase realtime db in order to dispatch actions upon data being pushed?

Here, I made it so that an effect (meeting.effects) reacts upon the meetingActions.START_MEETING action type and whenever data is pushed, dispatch an update order to the store, but this feels wrong for a series of reasons I come up with:

  • Hard to unit test in isolation (needs more context than itself)
  • In case of a "stop meeting" action, this approach needs to store a Subscription (or?) in order to stop the subscription. In my approach, there's no control over the observable being created in the wilderness.

How are such cases usually handled?

like image 505
Jem Avatar asked Mar 02 '18 20:03

Jem


3 Answers

Assuming websocket is emitting different kinds of events, map each event to different actions in a websocket service like

@Injectable()
class WebsocketService{
    private socket$:Observable<Action>
    getActions():Observable<Action>{
        if(!this.socket$) {
           this.socket$ = Observable.webSocket(url).pipe(
               map(functionToMapActions)
            ,shareReplay(1)
           );
        }
        return this.socket$;
    }
}

where functionToMapActions maps webSocket events to actions, and I suggest adding shareReplay operator at the end so that we read webSocket only once.

the Observable.webSocket connects to webSocket, and emits events as they arrive

webSocket connection will be established when you subscribe to webService.getActions()

You can subscribe to websocket actions in @Effects initialization see here

@Effect()
init$ = this.websocketService.getActions();

this will emit all actions as soon as your app starts (if the effect is in the root module) or your module is loaded if it is in the lazy loaded module;

Or if you are interested in a limited set of actions you can do like this

@Effect()
init$ = this.websocketService.getActions().pipe(filter(filterNeededActionsHere));

you can also start listening to actions only after a particular event like this

@Effect()
init$ = this.actions$.pipe(
          ofType('Event which marks start')
          ,swichMapTo(this.websocketService.getActions())
     );

like the previous example, you can also filter out the action here sane as before

hope this answers your question

like image 67
AbdulKareem Avatar answered Sep 20 '22 10:09

AbdulKareem


NgRx v9 changed syntax a little. Use following code for root-level effects class:

init$ = createEffect(() => 
  this.actions$.pipe(
    ofType(ROOT_EFFECTS_INIT),
    // websocket events handling logic
    switchMap(() => webSocketEvents$)
  )
);

This is just slightly adapted example from the Docs.

If you work with feature-level effect class, ROOT_EFFECTS_INIT won't work, you need to use OnRunEffects lifecycle hook:

class UserEffects implements OnRunEffects  {
  ngrxOnRunEffects(resolvedEffects$: Observable<EffectNotification>) {
    // websocket events handling logic
    return webSocketEvents$.pipe(
      exhaustMap(() => resolvedEffects$)
    );
  }
}

More detailed example is in the Docs.

webSocketEvents$ in both examples is an Observable that can be constructed using one of the rxjs functions:

webSocket(socketUrl) // if you want to manage websocket connection via rxjs
fromEvent(socketIoClient) // useful if websocket connection handled by socket.io-client
like image 42
im.pankratov Avatar answered Sep 20 '22 10:09

im.pankratov


A custom example of WebSocket emitting a different kind of events for an entity Vehicle using adapter

1.Create a Vehicle-Socket-Adapter.ts

import { fromEvent, merge, Observable } from 'rxjs';
import { map } from 'rxjs/operators';
import { SocketIoService } from 'src/app/shared/services/socket-io.service';

export class VehicleSocketAdapter {
  public onVehicleEngineStateChange: Observable<boolean>;
  constructor(vehicleId: string, private socketIoService: SocketIoService) {

   //using merge from rxjs/operators to concat multiple ws events to one Observable 

    this.onVehicleEngineStateChange = merge(
      fromEvent(this.socketIoService.socket, `event.vehicle.${vehicleId}.start`).pipe(
        map(vehicleStatus => true)
      ),
      fromEvent(this.socketIoService.socket,`event.vehicle.${vehicleId}.stop`).pipe(
        map(vehicleStatus => false)
      ),
    )
  }
}

2.Later import the adapter wherever you would like to use, an example app.component.ts

private subscriptions = new Subscription();
private listenForVehiclesState(vehicles) {
    vehicles.forEach((vehicle) => {
      const vehicleAdapter = new VehicleSocketAdapter(vehicle.id, this.webSocket);
      this.subscriptions.add( 
        vehicleAdapter.onVehicleEngineStateChange.subscribe(vehicleStatus => {
        
       // dispatch action when adapter commands you

        this.store.dispatch(
          VehiclesActions.BeginUpdateVehiclesStatusAction({
            payload: {
              vehicleId: vehicle.id,
              status: vehicleStatus
            }
          })
        );

      }));
    });
  }
  1. Don't forget to unsubscribe all subs when your view or a component is destroyed this.subscriptions.unsubscribe();

Bonus: Socket as service snippet :)

import { Injectable } from '@angular/core';
import * as io from 'socket.io-client';

@Injectable({
  providedIn: 'root'
})
export class SocketIoService {

  public socket: SocketIOClient.Socket = io('/', { path: '/api/livedata', transportOptions: { polling: extraHeaders: {'AuthToken': 'ifAny'} }});

constructor() { }
}

Happy Coding :)

like image 38
Divek John Avatar answered Sep 20 '22 10:09

Divek John