Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Angular 2 Observable with multiple subscribers

Tags:

angular

rxjs

I have an angular 2 service that fetch data from an API this service has 3 subscribers (defined in Components) each doing something else with the data (different graphs)

I'm noticing that I'm doing three GET requests to the API while what i want to achieve is one request and that the subscribers will share the data I've looks into HOT and COLD observable and tried the .share() on the observable but I'm still doing 3 individual calls

Update, adding code

Service

import { Injectable } from '@angular/core'; import { Http, Response } from '@angular/http';  import {Observable} from 'rxjs/Rx';  // Import RxJs required methods import 'rxjs/add/operator/map'; import 'rxjs/add/operator/catch';  import { StationCompliance } from './model/StationCompliance';   @Injectable() export class StationComplianceService {    private url = '/api/read/stations';    constructor(private http : Http) {     console.log('Started Station compliance service');    }     getStationCompliance() : Observable<StationCompliance []> {      return this.http.get(this.url)       .map((res:Response) => res.json())       .catch((error:any) => Observable.throw(error.json().error || 'Server Error'));    } } 

Component 1

import { Component, OnInit } from '@angular/core'; import { CHART_DIRECTIVES } from 'angular2-highcharts';  import { StationComplianceService } from '../station-compliance.service';   @Component({   selector: 'app-up-down-graph',   templateUrl: './up-down-graph.component.html',   styleUrls: ['./up-down-graph.component.css'] }) export class UpDownGraphComponent implements OnInit {    graphData;    errorMessage: string;    options;    constructor(private stationService : StationComplianceService) { }    ngOnInit() {     this.getStationTypes();   }    getStationTypes(){     this.stationService.getStationCompliance()       .subscribe(         graphData => {           this.graphData = graphData;           this.options = {             chart : {type: 'pie',                     plotShadow: true             },             plotOptions : {               showInLegend: true             },             title : {text: 'Up and Down devices'},             series: [{               data: this.processStationType(this.graphData)             }]           }         },         error => this.errorMessage = <any>error       );   } 

Other two components are almost the same, they just show other graph

like image 487
naoru Avatar asked Sep 21 '16 22:09

naoru


People also ask

Can one Observable have multiple subscribers?

Hot observable In other words, it allows multiple subscribers to share single subscription. To be precise, subscribe function is called when you invoke connect method.

How do I subscribe to two observables?

In our component, we use forkJoin to combine the Observables into a single value Observable. The forkJoin operator will subscribe to each Observable passed into it. Once it receives a value from all the Observables, it will emit a new value with the combined values of each Observable.

Does forkJoin subscribe?

forkJoin accepts a variable number of observables and subscribes to them in parallel.


2 Answers

I encountered a similar problem and solved it using Aran's suggestion to reference Cory Rylan's Angular 2 Observable Data Services blog post. The key for me was using BehaviorSubject. Here's the snippets of the code that ultimately worked for me.

Data Service:

The data service creates an internal BehaviorSubject to cache the data once when the service is initialized. Consumers use the subscribeToDataService() method to access the data.

    import { Injectable } from '@angular/core';     import { Http, Response } from '@angular/http';      import { BehaviorSubject } from 'rxjs/BehaviorSubject';     import { Observable } from 'rxjs/Observable';      import { Data } from './data';     import { properties } from '../../properties';      @Injectable()     export class DataService {       allData: Data[] = new Array<Data>();       allData$: BehaviorSubject<Data[]>;        constructor(private http: Http) {         this.initializeDataService();       }        initializeDataService() {         if (!this.allData$) {           this.allData$ = <BehaviorSubject<Data[]>> new BehaviorSubject(new Array<Data>());            this.http.get(properties.DATA_API)             .map(this.extractData)             .catch(this.handleError)             .subscribe(               allData => {                 this.allData = allData;                 this.allData$.next(allData);               },               error => console.log("Error subscribing to DataService: " + error)             );         }       }        subscribeToDataService(): Observable<Data[]> {         return this.allData$.asObservable();       }        // other methods have been omitted      } 
Component:

Components can subscribe to the data service upon initialization.

    export class TestComponent implements OnInit {       allData$: Observable<Data[]>;        constructor(private dataService: DataService) {       }        ngOnInit() {         this.allData$ = this.dataService.subscribeToDataService();       }      } 
Component Template:

The template can then iterate over the observable as necessary using the async pipe.

    *ngFor="let data of allData$ | async"  

Subscribers are updated each time the next() method is called on the BehaviorSubject in the data service.

like image 74
Scott Avatar answered Sep 21 '22 21:09

Scott


The issue that you have in your code is that you are returning a new observable each time your function is called. This is because http.get is creating a new Observable each time it is called. The way to solve this could be to store the observable (via closure) in the service which will ensure that all of the subjects are subscribing to the same observable. This isn't perfect code, but I had a similar issue and this solved my problem for the time being.

import { Injectable } from '@angular/core'; import { Http, Response } from '@angular/http';  import {Observable} from 'rxjs/Rx';  // Import RxJs required methods import 'rxjs/add/operator/map'; import 'rxjs/add/operator/catch';  import { StationCompliance } from './model/StationCompliance';   @Injectable() export class StationComplianceService {    private url = '/api/read/stations';    constructor(private http : Http) {     console.log('Started Station compliance service');    }     private stationComplianceObservable: Rx.Observable<StationCompliance[]>;      getStationCompliance() : Observable<StationCompliance []> {      if(this.stationComplianceObservable){         return this.stationComplianceObservable;     }                this.stationComplianceObservable = this.http.get(this.url)       .debounce(1000)       .share()       .map((res:Response) => res.json())       .finally(function () { this.stationComplianceObservable = null})        .catch((error:any) => Observable.throw(error.json().error || 'Server Error'));      return this.stationComplianceObservable;    } } 
like image 28
Ray Suelzer Avatar answered Sep 20 '22 21:09

Ray Suelzer