I'm trying to find an operator that behaves similarly to concatMap
, but drops everything in between. E.g., concatMap does the following:
- next a
- start handling a
- next b
- next c
- finish handling a
- start handling b
- finish handling b
- start handling c
- finish handling c
Instead, I'm looking for a mechanism that'll drop b
, since c
has already come in:
- next a
- start handling a
- next b
- next c
- finish handling a
- (skip b)
- start handling c
- finish handling c
See this gist for a more expanded example: https://gist.github.com/Burgov/afeada0d8aad58a9592aef6f3fc98543
I think the operator you are looking for is throttle
.
Here is a working Stackblitz. The key to making this work is setting the config object that is passed to throttle()
which allows it to emit (and process) both the leading and trailing source emissions, but ignore any emissions in between, during the time processData()
is running.
Here is the key function from the Stackblitz:
// Use 'from' to emit the above array one object at a time
const source$ = from(sourceData).pipe(
// Simulate a delay of 'delay * delayInterval' between emissions
concatMap(data => of(data).pipe(delay(data.delay * delayInterval))),
// Now tap in order to show the emissions on the console.
tap(data => console.log('next ', data.emit)),
// Finally, throttle as long as 'processData' is handling the emission
throttle(data => processData(data), { leading: true, trailing: true }),
).subscribe()
Short and sweet, and works as required except for one issue...
That "one issue" with the code above is that when the source observable completes, throttle()
unsubscribes from processData, effectively stopping any final processing that needed to be done. The fix, as Bart van den Burg pointed out in the comments below, is to use a Subject. I figure there are many ways to do this, but the Stackblitz has been updated with the following code that now works:
// Set up a Subject to be the source of data so we can manually complete it
const source$ = new Subject();
// the data observable is set up just to emit as per the gist.
const dataSubscribe = from(sourceData).pipe(
// Simulate a delay of 'delay * delayInterval' before the emission
concatMap(data => of(data).pipe(delay(data.delay * delayInterval))),
).subscribe(data => {
console.log('next ', data.emit); // log the emission to console
source$.next(data); // Send this emission into the source
});
// Finally, subscribe to the source$ so we can process the data
const sourceSubscribe = source$.pipe(
// throttle as long as 'processData' is handling the emission
throttle(data => processData(data), { leading: true, trailing: true })
).subscribe(); // will need to manually unsubscribe later ...
This is the easiest solution I was able to make:
const source = new Subject();
const start = new Date();
const mockDelayedObs = val => of(val).pipe(delay(1200));
source.pipe(
multicast(
new ReplaySubject(1),
subject => {
let lastValue;
return subject.pipe(
filter(v => v !== lastValue),
exhaustMap(v => {
lastValue = v;
return mockDelayedObs(v);
}),
take(1),
repeat(),
);
}
),
)
.subscribe(v => {
console.log(new Date().getTime() - start.getTime(), v);
});
setTimeout(() => source.next(0), 0);
setTimeout(() => source.next(1), 500);
setTimeout(() => source.next(2), 1000);
setTimeout(() => source.next(3), 1500);
setTimeout(() => source.next(4), 1800);
setTimeout(() => source.next(5), 4000);
Live demo: https://stackblitz.com/edit/rxjs-z33jgp?devtoolsheight=60
The order of actions is like this this:
next 0
start handling 0
next 1
next 2
finish handling 0
start handling 2
next 3
next 4
finish handling 2
start handling 4
finish handling 4
start handling 5
finish handling 4
So only 0, 2, 4 and 5 will be printed
This would work without the multicast
operator as well but I wanted to avoid leaking state variables. It seems like it's not entirely possible without them so there's just one lastValue
. This variable is used only to ignore calling mockDelayedObs
for the same value twice after resubscribing to the same chain with repeat()
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With