Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Deferred pattern with RxJS 5 observables

For arbitrary promise implementation, the deferred pattern (not to be confused with antipattern) may may look like:

const deferred = new Deferred;
...
// scopes where `deferred` object reference was passed before promise settlement
deferred.promise.then((result) => { ... }, (error) => { ... });
...
deferred.resolve(...);
// doesn't affect promise state
deferred.reject();
...
// after promise settlement
deferred.promise.then((result) => { ... }, (error) => { ... });

deferred object holds unsettled promise that can be passed to other function scopes by reference. All promise chains will be executed on promise settlement, it doesn't matter if deferred.promise was settled before chaining with then or after. The state of promise cannot be changed after it was settled.


As the answer suggests, the initial choices are ReplaySubject and AsyncSubject.

For the given setup (a demo)

var subject = new Rx.AsyncSubject;
var deferred = subject.first();

deferred.subscribe(
  console.log.bind(console, 'Early result'),
  console.log.bind(console, 'Early error')
);

setTimeout(() => {
  deferred.subscribe(
    console.log.bind(console, 'Late result'),
    console.log.bind(console, 'Late error')
  );
});

This results in desirable behaviour:

subject.error('one');
subject.next('two');

Early error one

Late error one

This results in undesirable behaviour:

subject.error('one');
subject.next('two');
subject.complete();

Early error one

Late result two

This results in undesirable behaviour:

subject.next('two');
subject.complete();
subject.next('three');

Early result two

Late result three

The results from ReplaySubject differ but are still inconsistent with expected results. next values and error errors are treated separately, and complete doesn't prevent the observers from receiving new data. This may work for single next/error, the problem is that next or error may be called multiple times unintentionally.

The reason why first() is used is because subscribes are one-time subscriptions, and I would like to remove them to avoid leaks.

How should it be implemented with RxJS observables?

like image 278
Estus Flask Avatar asked Jun 30 '16 17:06

Estus Flask


People also ask

How do I use RxJS defer?

defer allows you to create an Observable only when the Observer subscribes. It waits until an Observer subscribes to it, calls the given factory function to get an Observable -- where a factory function typically generates a new Observable -- and subscribes the Observer to this Observable.

Are promises more advanced than observables?

While an Observable can do everything a Promise can, the reverse is not true. For example, an Observable can emit multiple values over time. A Promise only resolves once. This may lead you to believe that the Observable is BETTER than the Promise.

What are RxJS observables?

RxJS introduces Observables, a new Push system for JavaScript. An Observable is a Producer of multiple values, "pushing" them to Observers (Consumers). A Function is a lazily evaluated computation that synchronously returns a single value on invocation.

How are RxJS observables different than promises?

the Promise is always asynchronous, while the Observable can be either asynchronous or synchronous, the Promise can provide a single value, whereas the Observable is a stream of values (from 0 to multiple values), you can apply RxJS operators to the Observable to get a new tailored stream.


2 Answers

You are probably looking for a Rx.ReplaySubject(1) (or an Rx.AsyncSubject() depending on your use case).

For a more detailed explanation of subjects, see What are the semantics of different RxJS subjects?.

Basically, a subject can be passed around by reference, like a deferred. You can emit values (resolve would be an 'next' (Rxjs v5) or 'onNext' (Rxjs v4) followed by 'complete' or 'onCompleted()') to it, as long as you hold that reference.

You can have any amount of subscribers to a subject, similar to the then to a deferred. If you use a replaySubject(1), any subscribers will receive the last emitted value, which should answer your it doesn't matter if deferred.promise was settled before chaining with then or after.. In Rxjs v4, a replaySubject will emit its last value to a subscriber subscribing after it has completed. I am not sure about the behaviour in Rxjs v5.

  • https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/subjects/asyncsubject.md
  • https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/subjects/replaysubject.md

Update

The following code executed with Rxjs v4 :

var subject = new Rx.AsyncSubject();
var deferred = subject;

deferred.subscribe(
  console.log.bind(console, 'First result'),
  console.log.bind(console, 'First error')
);

setTimeout(() => {
  deferred.subscribe(
    console.log.bind(console, 'Second result'),
    console.log.bind(console, 'Second error')
  );
});

subject.onNext('one');
subject.onCompleted();
subject.onNext('two');
subject.onNext('three');
subject.onNext('four');

produces the following output:

First result one
Second result one

However, the same code executed with Rxjs v5 does not :

First result one
Second result four

So basically that means that subjects' semantics have changed in Rxjs v5!!! That really is a breaking change to be aware of. Anyways, you could consider moving back to Rxjs v4, or use the turnaround suggested by artur grzesiak in his answer. You could also file an issue on the github site. I would believe that the change is intentional, but in the advent it is not, filing the issue might help clarify the situation. In any case, whatever behaviour chosen should definitely be documented properly.

The question about subjects' semantics features a link showing the async subject in relation with multiple and late subscription

like image 191
user3743222 Avatar answered Oct 30 '22 10:10

user3743222


As @user3743222 wrote AsyncSubject maybe used in deferred implementation, but the thing is it has to be private and guarded from multiple resolves / rejects.

Below is a possible implementation mirroring resolve-reject-promise structure:

const createDeferred = () => {
  const pending = new Rx.AsyncSubject(); // caches last value / error
  const end = (result) => {
    if (pending.isStopped) {
      console.warn('Deferred already resloved/rejected.'); // optionally throw
      return;
    }
    
    if (result.isValue) {
      pending.next(result.value);
      pending.complete();
    } else {
      pending.error(result.error);
    }
  }
  return {
    resolve: (value) => end({isValue: true, value: value }),
    reject: (error) => end({isValue: false, error: error }),
    observable: pending.asObservable() // hide subject
  };
}

// sync example
let def = createDeferred();
let obs = def.observable;
obs.subscribe(n => console.log('BEFORE-RESOLVE'));
def.resolve(1);
def.resolve(2); // warn - no action
def.reject('ERROR') // warn - no action
def.observable.subscribe(n => console.log('AFTER-RESOLVE'));

// async example
def = createDeferred();
def.observable.subscribe(() => console.log('ASYNC-BEFORE-RESOLVE'));
setTimeout(() => {
  def.resolve(1);
  setTimeout(() => {
    def.observable.subscribe(() => console.log('ASYNC-AFTER-RESOLVE'));
    def.resolve(2); // warn
    def.reject('err'); // warn
  }, 1000)
}, 1000);

// async error example
const def3 = createDeferred();
def3.observable.subscribe(
  (n) => console.log(n, 'ERROR-BEFORE-REJECTED (I will not be called)'),
  (err) => console.error('ERROR-BEFORE-REJECTED', err));
setTimeout(() => {
  def3.reject('ERR');
  setTimeout(() => {
    def3.observable.subscribe(
      (n) => console.log(n, 'ERROR-AFTER-REJECTED (I will not be called)'),
      (err) => console.error('ERROR-AFTER-REJECTED', err));
    def3.resolve(2); // warn
    def3.reject('err'); // warn
  }, 1000)
}, 3000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.0-beta.9/Rx.umd.js"></script>
like image 41
artur grzesiak Avatar answered Oct 30 '22 12:10

artur grzesiak