Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Hot and shared Observable from an EventEmitter

Is there a way to have a hot observable from an EventEmitter (or equivalent available in Angular 2 alpha 46 / RxJS 5 alpha)? i.e. if we subscribe after the value is resolved, it triggers with the previously resolved value. Similar to what we have when always returning the same promise.

Ideally, only using Angular 2 objects (I read somewhere a light RxJS would be embedded later to remove the dependency), otherwise importing RxJS is fine. AsyncSubject seems to match my need, but it is not available in RxJS 5 alpha.

I tried the following, without success (never triggers). Any idea about how to use it?

let emitter = new EventEmitter<MyObj>();
setTimeout(() => {emitter.next(new MyObj());});
this.observable = emitter;
return this.observable.share();

Full plunker here comparing hot and cold

Usecase: reach some async objects only once (for example a series of HTTP calls merged/wrapped in a new EventEmitter), but provide the resolved async object to any service/component subscribing to it, even if they subscribe after it is resolved (the HTTP responses are received).

EDIT: the question is not about how to merge HTTP responses, but how to get a (hot?) observable from EventEmitter or any equivalent available with Angular 2 alpha 46 / RxJS 5 alpha that allows to subscribe after the async result is retrieved/resolved (HTTP is just an example of async origin). myEventEmitter.share() does not work (cf plunker above), although it works with the Observable returned by HTTP (cf plunker from @Eric Martinez). And as of Angular 2 alpha 46, .toRx() method does not exist any more, the EventEmitter is the observable and subject itself.

This is something working well with promises as long as we always return the same promise object. Since we have observers introduced with HTTP Angular 2 services, I would like to avoid mixing promises and observers (and observers are said to be more powerful than promises, so it should allow to do what is easy with promises).

Specs about share() (I haven't found doc for version 5 alpha - version used by Angular 2) - working on the Observable returned by the Angular 2 HTTP service, not working on EventEmitter.

EDIT: clarified why not using the Observable returned by HTTP and added that not using RxJS directly would be even better.

EDIT: changed description: the concern is about multiple subscriptions, not merging the HTTP results.

Thanks!

like image 491
Antoine OL Avatar asked Sep 27 '22 01:09

Antoine OL


2 Answers

The functionality you seem to be describing is not that of a cold observable but more than of a Rx.BehaviourSubject. Have a look here for an explanation on Rxjs subjects : https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/subjects.md.

I quote from there :

BehaviourSubject is similar to ReplaySubject, except that it only stored the last value it published. BehaviourSubject also requires a default value upon initialization. This value is sent to observers when no other value has been received by the subject yet. This means that all subscribers will receive a value instantly on subscribe, unless the Subject has already completed.

The Rx.AsyncSubject would be the closest in behaviour to a promise :

AsyncSubject is similar to the Replay and Behavior subjects, however it will only store the last value, and only publish it when the sequence is completed. You can use the AsyncSubject type for situations when the source observable is hot and might complete before any observer can subscribe to it. In this case, AsyncSubject can still provide the last value and publish it to any future subscribers.

Two more comments:

  • in your plunker : this._coldObservable = emitter.share();. Using share returns a hot observable!
  • EventEmitter actually extends subject in the first place

UPDATE : Wrapping an EventEmitter around an Rx.Observable:

function toRx ( eventEmitter ) {
  return Rx.Observable.create(function ( observer ) {
    eventEmitter.subscribe(function listener ( value ) {observer.onNext(value)});
    // Ideally you also manage error and completion, if that makes sense with Angular2
    return function () {
      /* manage end of subscription here */
    };
  };
)
}

Once you have that Rx.Observable, you can apply share(), shareReplay(1), anything you want.

My bet is that the Angular team will sooner or later propose a brigding function but if you don't want to wait, you can do it yourself.

like image 123
user3743222 Avatar answered Oct 21 '22 13:10

user3743222


ReplaySubject is doing what I was looking for. @robwormald provided a working example on gitter I slightly modified to better demonstrate.

Exposing HTTP response:

import {Injectable} from 'angular2/angular2';
import {Http} from 'angular2/http';
import {ReplaySubject} from '@reactivex/rxjs/dist/cjs/Rx'

@Injectable()
export class PeopleService {
  constructor(http:Http) {
    this.people = new ReplaySubject(1);

    http.get('api/people.json')
      .map(res => res.json())
      .subscribe(this.people);
  }
}

Subscribing multiple times:

// ... annotations
export class App {
  constructor(peopleService:PeopleService) {

    people.subscribe(v => {
      console.log(v)
    });

    //some time later

    setTimeout(() => {
      people.subscribe(v => {
        console.log(v)
      });
      people.subscribe(v => {
        console.log(v)
      });
    },2000)
  }
}

Full plunker

EDIT: the BehaviorSubject is an alternative. In this usecase, the difference is the initial value, for example if we want to display content from cache before updating with the HTTP response.

like image 26
Antoine OL Avatar answered Oct 21 '22 13:10

Antoine OL