I am sure that I should be able to find the answer to my problem in the documentation of rxjs, but I am getting nowhere...
Problem: I have two streams, one is more important than the other. Both provide the same information. But in case stream A provides a value for the first time, I want to completely dismiss stream B, because it is now outdated.
It is kind of a cache-thing: provide a value from the cache, but as soon as there is a live value, update the cache and do not read form it anymore.
I have tried several combinations piping the two streams, but I cannot cancel B after A has published...
Which rx-operator could provide the functionality that I need?
It looks like you can just use merge and takeUntil:
merge(
streamA$.pipe(
takeUntil(streamB$)
),
streamB$
)
When streamB$ emits anything then streamA$ will be ignored because takeUntil will complete the chain.
If I understand correctly, you can merge the two streams and take the first that answers:
const interval1$ = interval(1000);
const interval2$ = interval(800);
const source1 = interval1$.pipe(
map(x => { return { id: 1, value: x } }),
take(5)
);
const source2 = interval2$.pipe(
map(x => { return { id: 2, value: x } }),
take(5)
);
const s = merge(source1, source2);
const subscribe = s.pipe(take(1)).subscribe(x => console.log(x));
This prints the first that answers, that is the first element of the second stream in this case.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With