I have two stream with different priority.
Each event must play a sound that last for example 5s. requirement are:
I have tried this way but also low kill high event example playground
This is tricky but I think it's doable by combining concatMap and takeUntil. I left a few logs to make more obvious how it all works.
const randomStream$ = defer(() => of(null).pipe(
delay(Math.random() * 10000),
)).pipe(take(1), repeat());
let hIndex = 0;
let lIndex = 0;
const high$ = randomStream$.pipe(map(v => `H ${hIndex++}`));
const low$ = randomStream$.pipe(map(v => `L ${lIndex++}`));
merge(high$, low$).pipe(
concatMap(v => {
const sound$ = of(v).pipe(
delay(5000),
);
if (v[0] === 'H') { // Item from the high priority stream
return sound$;
} else {
return sound$.pipe(
takeUntil(high$.pipe(
tap(() => console.log(`${v} canceled`))
)),
);
}
}),
).subscribe(v => console.log(`${v} done`));
Live demo: https://stackblitz.com/edit/rxjs-jdbkhb
So all emission from high$ and low$ are turned into sound$ that takes 5s. Then the logic splits for different types of stream:
High priority stream is just returned and thanks to concatMap it really takes 5s for sound$ to complete.
Low priority stream is chained with takeUntil so when another high$ is emitted it's completed immediately and the surrounding concatMap starts processing another one. If low$ comes then it's queued by concatMap.
Btw, I don't know if you want to be queing emissions from both low$ and high$ because if you have multiple low$s buffered then they'll be processed in the same order as they arrived combined with emissions from high$.
So maybe you'll want to ignore all emissions from low$ while there are pending emissions from high$ but this will require I think one side-effect (variable outside of the chain) the will count the pending emissions from high$ (or maybe just return EMPTY from concatMap when there are pending high priority items pending?).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With