a: 1---2-3-4--5---6
b: ------T---------
o: ------1234-5---6
Using RxJS, is there some operator that can accomplish the diagram above? I have stream A which is a random stream of events, given a stream B which has a single true
event, can I have an output stream that doesn't emit anything until that true
event, and then sends everything is had saved up until then and afterwards emits normally?
I thought maybe I could use buffer()
, but it seems like there is no way to do a one time buffer like this with that operator.
const { concat, interval, of, from } = rxjs;
const { share, delay, toArray, takeUntil, mergeMap } = rxjs.operators;
const waitUntil = signal$ => source$ => {
const sharedSource$ = source$.pipe(share());
return concat(
sharedSource$.pipe(
takeUntil(signal$),
toArray(),
mergeMap(from)
),
sharedSource$
);
}
const stopWaiting$ = of('signal').pipe(delay(2000));
const source$ = interval(500).pipe(
waitUntil(stopWaiting$)
).subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.3/rxjs.umd.js"></script>
I think @ZahiC's solution is correct but personally I'd do it in a single chain using the multicast
operator.
a$.pipe(
multicast(new Subject(), s => concat(
s.pipe(
buffer(b$),
take(1),
),
s
)),
)
multicast
will basically spit the stream into two where concat
will first subscribe to the first one that is buffered until b$
emits. Then it completes immediately because of take(1)
and concat
subscribe to the same steam again but this time unbuffered.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With