Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJS operator waitUntil

Tags:

rxjs

a: 1---2-3-4--5---6
b: ------T---------

o: ------1234-5---6

Using RxJS, is there some operator that can accomplish the diagram above? I have stream A which is a random stream of events, given a stream B which has a single true event, can I have an output stream that doesn't emit anything until that true event, and then sends everything is had saved up until then and afterwards emits normally?

I thought maybe I could use buffer(), but it seems like there is no way to do a one time buffer like this with that operator.

like image 468
delashum Avatar asked Mar 05 '23 16:03

delashum


2 Answers

const { concat, interval, of, from } = rxjs;
const { share, delay, toArray, takeUntil, mergeMap } = rxjs.operators;

const waitUntil = signal$ => source$ => {
  const sharedSource$ = source$.pipe(share());
  return concat(
    sharedSource$.pipe(
      takeUntil(signal$),
      toArray(),
      mergeMap(from)
    ),
    sharedSource$
  );
}

const stopWaiting$ = of('signal').pipe(delay(2000));

const source$ = interval(500).pipe(
    waitUntil(stopWaiting$)
).subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.3/rxjs.umd.js"></script>
like image 188
ZahiC Avatar answered Apr 27 '23 11:04

ZahiC


I think @ZahiC's solution is correct but personally I'd do it in a single chain using the multicast operator.

a$.pipe(
  multicast(new Subject(), s => concat(
    s.pipe(
      buffer(b$),
      take(1),
    ),
    s
  )),
)

multicast will basically spit the stream into two where concat will first subscribe to the first one that is buffered until b$ emits. Then it completes immediately because of take(1) and concat subscribe to the same steam again but this time unbuffered.

like image 44
martin Avatar answered Apr 27 '23 12:04

martin