Working with RxJS in Angular 4.x, I'm seeing two very different patterns for generating Observables from streams of user initiated actions. One stream is the direct result of a user clicking an 'add item' button that generates a new object. The other is a series of events issued by some third party code I'm using.
I want to be able to combine these two streams using something like 'combineLatest' to generate a single Observable.
With my button, I've followed the following pattern:
const signal = Observable.create(
(observer) => {
this.additem= (item) => observer.next(item);
}
);
this.item$ = signal.map((item) => [item])
.scan((accumulator, value) => {
return accumulator.concat(value);
});
However, I'm seeing a lot of information saying I should be using Subjects instead - which I am trying to use with my event callback like so:
sort$ = new Subject();
sortChange(sort){
sort$.next(sort);
}
Then I'm attempting to combine these like this:
combine$ = Observable.combineLatest(sort$, item$,
(sort, items) => {
return "something that does stuff with these";}
);
My questions are - what is the preferred pattern for 'manually' generating streams? Can/should observables and subjects be meshed together into a single observable like I'm trying to do here?
Observables Can Emit Data and Notifications AsynchronouslyWe pass a success callback to the promise object by calling its then() method. In the success callback, we emit the data returned from fetch by calling observer. next(pikachu) and also notify the observer that we have finished sending data by calling observer.
Use takeUntil with a Subject as notifier to complete Observables. You can pass a file id to the Subject and use filter in takeUntil to only cancel the file upload of a file with a given id. Use defaultIfEmpty to provide a value that indicates a cancelled request.
Angular provides an EventEmitter class that is used when publishing values from a component through the @Output() decorator. EventEmitter extends RxJS Subject, adding an emit() method so it can send arbitrary values. When you call emit(), it passes the emitted value to the next() method of any subscribed observer.
Of course you can combine Observables and Subjects into one stream.
I think the question here is what makes more sense in you usecase. From your description when implementing something like "add item" functionality I'd prefer Subject
over Observable.create
.
This is because every time you subscribe to your signal
you're reassigning this.additem
. The callback to Observable.create
is called for every observer. Note that more correct usage of Observable.create
would look like this:
const signal = Observable.create((observer) => {
this.additem = (item) => observer.next(item);
return () => this.additem = null;
});
The returned callback () => this.additem = null
is called when you unsubscribe from this Observable and that's the place where you should handle all cleanup.
However, if you make two subscriptions to signal
then you'll override this.additem
twice and then if you chose to unsubscribe one of the observers you would this.additem = null
and it would probably lead to an unexpected behavior.
So in this case it makes more sense to use Subject
. For example like this:
const subject = new Subject();
this.additem = (item) => subject.next(item);
If you want to see more real life example of Observable.create
have a look at for example this: Subscribe to a stream with RxJS and twitter-stream-api module
Edit: Also have a look at these articles from the lead developer of RxJS 5:
https://medium.com/@benlesh/on-the-subject-of-subjects-in-rxjs-2b08b7198b93
https://medium.com/@benlesh/rxjs-dont-unsubscribe-6753ed4fda87
https://medium.com/@benlesh/hot-vs-cold-observables-f8094ed53339
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With