I'm getting a hard time making a complex observable pipe and I would be grateful if someone could help me on that one…
I have one stream of data that gives me through Bluetooth some values which are data frames that I have to decode.
This is a BehaviorSubject called RX$
.
Now on RX$
, sometimes I receive Instant Data (INST) and sometimes History Data (HIST). With INST, I receive besides other things the device that is sending data version and model. I successfully generated an observable that is able to compute me a JSON object with the device version and model and that don't emit as long as it don't have both, let's call it deviceVersionModelStream$
Now on the other side, I receive HIST data frames in bulk in a stream we will call historyStream$
, and because there is a lot of data, I used bufferTime(2000)
to make an array of data and rely on my embedded database bulk insert (instead of one by one).
This worked well until now…
Now my customer added a new rule, they have an old device type that is not able to give me some data for a specific case, but using the same pattern I know what else it is giving to me.
Therefore, I need to have the device version and model before decoding frame and insert in the database.
My question is, how can I delay historyStream$
occurrences as long as deviceVersionModelStream$
emit one time (it's a HOT one too used on others places) and when it occurs, I want to generate some kind of JSON object with both the raw frame and the version/model.
But ALSO gradually dispatch this information to not overwhelm my database bulk inserts, as my bufferTime(2000) did before?.
I'm trying things with buffer, mergeMap, delay, but I'm having hard time achieving this one…
Maybe someone strong with RX could help me?
Thanks a lot
This post is a little old, but I recently stumbled upon it while looking for a "buffer rxjs observables until another event occurred", and I thought I would share what an updated version would look like.
I built this on https://thinkrx.io. Here is the code I used:
const { rxObserver } = require('api/v0.3');
const { timer, of, combineLatest, concat } = require('rxjs');
const { delay, take, share, buffer, mergeAll, bufferTime, filter, takeUntil, skipUntil } = require('rxjs/operators');
const historyStream$ = timer(0, 10).pipe(
share(), take(10)
);
const versionModel$ = of("A").pipe(
delay(50),
take(1)
);
historyStream$.subscribe(rxObserver('History Stream'));
versionModel$.subscribe(rxObserver('Version Model'));
combineLatest([
versionModel$,
concat([
historyStream$.pipe(buffer(versionModel$)),
historyStream$.pipe(skipUntil(versionModel$))
]).pipe(
mergeAll()
)]
).subscribe(rxObserver("Buffer Window"));
Here is the output:
What's going on here is that we are passing two observables into concat
: one that represents the buffered set of events, and on that represents the stream of events as they come in. We finally pipe all of that into mergeAll
to get the appropriate effect.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With