Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Handle priority in two observable

Tags:

rxjs

I have two stream with different priority.

Each event must play a sound that last for example 5s. requirement are:

  1. High priority event must kill lower priority sound
  2. Otherwise every event must wait that previous sound complete

I have tried this way but also low kill high event example playground

like image 290
daniel_ Avatar asked Jan 31 '26 18:01

daniel_


1 Answers

This is tricky but I think it's doable by combining concatMap and takeUntil. I left a few logs to make more obvious how it all works.

const randomStream$ = defer(() => of(null).pipe(
  delay(Math.random() * 10000),
)).pipe(take(1), repeat());

let hIndex = 0;
let lIndex = 0;
const high$ = randomStream$.pipe(map(v => `H ${hIndex++}`));
const low$ = randomStream$.pipe(map(v => `L ${lIndex++}`));

merge(high$, low$).pipe(
  concatMap(v => {
    const sound$ = of(v).pipe(
      delay(5000),
    );

    if (v[0] === 'H') { // Item from the high priority stream
      return sound$;
    } else {
      return sound$.pipe(
        takeUntil(high$.pipe(
          tap(() => console.log(`${v} canceled`))
        )),
      );
    }
  }),
).subscribe(v => console.log(`${v} done`));

Live demo: https://stackblitz.com/edit/rxjs-jdbkhb

So all emission from high$ and low$ are turned into sound$ that takes 5s. Then the logic splits for different types of stream:

  • High priority stream is just returned and thanks to concatMap it really takes 5s for sound$ to complete.

  • Low priority stream is chained with takeUntil so when another high$ is emitted it's completed immediately and the surrounding concatMap starts processing another one. If low$ comes then it's queued by concatMap.

Btw, I don't know if you want to be queing emissions from both low$ and high$ because if you have multiple low$s buffered then they'll be processed in the same order as they arrived combined with emissions from high$.

So maybe you'll want to ignore all emissions from low$ while there are pending emissions from high$ but this will require I think one side-effect (variable outside of the chain) the will count the pending emissions from high$ (or maybe just return EMPTY from concatMap when there are pending high priority items pending?).

like image 180
martin Avatar answered Feb 02 '26 12:02

martin