For arbitrary promise implementation, the deferred pattern (not to be confused with antipattern) may may look like:
const deferred = new Deferred;
...
// scopes where `deferred` object reference was passed before promise settlement
deferred.promise.then((result) => { ... }, (error) => { ... });
...
deferred.resolve(...);
// doesn't affect promise state
deferred.reject();
...
// after promise settlement
deferred.promise.then((result) => { ... }, (error) => { ... });
deferred
object holds unsettled promise that can be passed to other function scopes by reference. All promise chains will be executed on promise settlement, it doesn't matter if deferred.promise
was settled before chaining with then
or after. The state of promise cannot be changed after it was settled.
As the answer suggests, the initial choices are ReplaySubject
and AsyncSubject
.
For the given setup (a demo)
var subject = new Rx.AsyncSubject;
var deferred = subject.first();
deferred.subscribe(
console.log.bind(console, 'Early result'),
console.log.bind(console, 'Early error')
);
setTimeout(() => {
deferred.subscribe(
console.log.bind(console, 'Late result'),
console.log.bind(console, 'Late error')
);
});
This results in desirable behaviour:
subject.error('one');
subject.next('two');
Early error one
Late error one
This results in undesirable behaviour:
subject.error('one');
subject.next('two');
subject.complete();
Early error one
Late result two
This results in undesirable behaviour:
subject.next('two');
subject.complete();
subject.next('three');
Early result two
Late result three
The results from ReplaySubject
differ but are still inconsistent with expected results. next
values and error
errors are treated separately, and complete
doesn't prevent the observers from receiving new data. This may work for single next
/error
, the problem is that next
or error
may be called multiple times unintentionally.
The reason why first()
is used is because subscribe
s are one-time subscriptions, and I would like to remove them to avoid leaks.
How should it be implemented with RxJS observables?
defer allows you to create an Observable only when the Observer subscribes. It waits until an Observer subscribes to it, calls the given factory function to get an Observable -- where a factory function typically generates a new Observable -- and subscribes the Observer to this Observable.
While an Observable can do everything a Promise can, the reverse is not true. For example, an Observable can emit multiple values over time. A Promise only resolves once. This may lead you to believe that the Observable is BETTER than the Promise.
RxJS introduces Observables, a new Push system for JavaScript. An Observable is a Producer of multiple values, "pushing" them to Observers (Consumers). A Function is a lazily evaluated computation that synchronously returns a single value on invocation.
the Promise is always asynchronous, while the Observable can be either asynchronous or synchronous, the Promise can provide a single value, whereas the Observable is a stream of values (from 0 to multiple values), you can apply RxJS operators to the Observable to get a new tailored stream.
You are probably looking for a Rx.ReplaySubject(1)
(or an Rx.AsyncSubject()
depending on your use case).
For a more detailed explanation of subjects, see What are the semantics of different RxJS subjects?.
Basically, a subject can be passed around by reference, like a deferred. You can emit values (resolve would be an 'next'
(Rxjs v5) or 'onNext'
(Rxjs v4) followed by 'complete'
or 'onCompleted()'
) to it, as long as you hold that reference.
You can have any amount of subscribers to a subject, similar to the then
to a deferred. If you use a replaySubject(1)
, any subscribers will receive the last emitted value, which should answer your it doesn't matter if deferred.promise was settled before chaining with then or after.
. In Rxjs v4, a replaySubject
will emit its last value to a subscriber subscribing after it has completed. I am not sure about the behaviour in Rxjs v5.
The following code executed with Rxjs v4 :
var subject = new Rx.AsyncSubject();
var deferred = subject;
deferred.subscribe(
console.log.bind(console, 'First result'),
console.log.bind(console, 'First error')
);
setTimeout(() => {
deferred.subscribe(
console.log.bind(console, 'Second result'),
console.log.bind(console, 'Second error')
);
});
subject.onNext('one');
subject.onCompleted();
subject.onNext('two');
subject.onNext('three');
subject.onNext('four');
produces the following output:
First result one
Second result one
However, the same code executed with Rxjs v5 does not :
First result one
Second result four
So basically that means that subjects' semantics have changed in Rxjs v5!!! That really is a breaking change to be aware of. Anyways, you could consider moving back to Rxjs v4, or use the turnaround suggested by artur grzesiak in his answer. You could also file an issue on the github site. I would believe that the change is intentional, but in the advent it is not, filing the issue might help clarify the situation. In any case, whatever behaviour chosen should definitely be documented properly.
The question about subjects' semantics features a link showing the async subject in relation with multiple and late subscription
As @user3743222 wrote AsyncSubject
maybe used in deferred
implementation, but the thing is it has to be private
and guarded from multiple resolve
s / reject
s.
Below is a possible implementation mirroring resolve-reject-promise
structure:
const createDeferred = () => {
const pending = new Rx.AsyncSubject(); // caches last value / error
const end = (result) => {
if (pending.isStopped) {
console.warn('Deferred already resloved/rejected.'); // optionally throw
return;
}
if (result.isValue) {
pending.next(result.value);
pending.complete();
} else {
pending.error(result.error);
}
}
return {
resolve: (value) => end({isValue: true, value: value }),
reject: (error) => end({isValue: false, error: error }),
observable: pending.asObservable() // hide subject
};
}
// sync example
let def = createDeferred();
let obs = def.observable;
obs.subscribe(n => console.log('BEFORE-RESOLVE'));
def.resolve(1);
def.resolve(2); // warn - no action
def.reject('ERROR') // warn - no action
def.observable.subscribe(n => console.log('AFTER-RESOLVE'));
// async example
def = createDeferred();
def.observable.subscribe(() => console.log('ASYNC-BEFORE-RESOLVE'));
setTimeout(() => {
def.resolve(1);
setTimeout(() => {
def.observable.subscribe(() => console.log('ASYNC-AFTER-RESOLVE'));
def.resolve(2); // warn
def.reject('err'); // warn
}, 1000)
}, 1000);
// async error example
const def3 = createDeferred();
def3.observable.subscribe(
(n) => console.log(n, 'ERROR-BEFORE-REJECTED (I will not be called)'),
(err) => console.error('ERROR-BEFORE-REJECTED', err));
setTimeout(() => {
def3.reject('ERR');
setTimeout(() => {
def3.observable.subscribe(
(n) => console.log(n, 'ERROR-AFTER-REJECTED (I will not be called)'),
(err) => console.error('ERROR-AFTER-REJECTED', err));
def3.resolve(2); // warn
def3.reject('err'); // warn
}, 1000)
}, 3000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.0-beta.9/Rx.umd.js"></script>
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With