I'm filtering an Observable
with the input of another Observable
- the input for the filtering comes from the user.
The filtering is done with the RxJS operator combineLatest
. Using this means that when subscribing to this stream, no values are emitted until there has been an emission from both source Observables - I'd like the created stream to emit on creation (without any filtering), before any user input takes place.
I think I should be using the startWith
operator so the stream has an emission on creation, but I can't work out how to seed this from an Observable. Using an Observable because data comes from Firebase and is handled with FirebaseListObservable
's.
Below is a pieced-together version of what I'm doing currently.
let tagInput = document.getElementById('tags');
let tagExclusionStream = Observable
.fromEvent(tagInput, 'input')
.map((e: any) => createsArrayFromInput(e.target.value));
let allTags: Observable<any[]> = getAllTags();
let filteredTags = allTags
.combineLatest(tagExclusionStream, (tags, tagExclusions) => {
return tags.filter((tag: any) => tagExclusions.indexOf(tag.$key) == -1)
});
// I want this to print out without needing the tagExclusionStream to emit first
filteredTags.subscribe(tags => console.log("Tags:", tags))
Please let me know if my approach here is completely off/there's a better way as I'm new to RxJS.
startWith
is just using concat
internally.
So do
const startValue$ = of('start with this')
concat(startValue$, source$)
Or write your own operator
function startFrom<T, O extends ObservableInput<any>>(start: O): OperatorFunction<T, T | ObservedValueOf<O>> {
return (source: Observable<T>) => concat(start, source)
}
source$.pipe(
startFrom(startValue$)
)
Make sure startValue$
completes
The new Observable will switch to source$
once startValue$
completes.
So if the Observable to start with is a long-lived Observable e.g. a ReplaySubject
where you only care about the most recent value you have to make sure the Observable you pass in completes, e.g. with take(1)
.
const { ReplaySubject, concat, of } = rxjs;
const { take } = rxjs.operators;
const source$ = of('source-1', 'source-2')
const replay = new ReplaySubject(1)
replay.next('subject-1')
replay.next('subject-2')
const startWith$ = replay.pipe(take(1)) // <-- take one item and complete
const o$ = concat(startWith$, source$)
o$.subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.min.js" integrity="sha256-85uCh8dPb35sH3WK435rtYUKALbcvEQFC65eg+raeuc=" crossorigin="anonymous"></script>
I think this will do the trick:
let filteredTags = allTags
.combineLatest(tagExclusionStream.startWith(''), (tags, tagExclusions) => {
return tags.filter((tag: any) => tagExclusions.indexOf(tag.$key) == -1)
});
Alternatively, if you use tagExclusionStream
in different places you can do this:
let tagExclusionStream = Observable
.fromEvent(tagInput, 'input')
.map((e: any) => createsArrayFromInput(e.target.value))
.startWith('');
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With