Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJs - concatMap alternative that drops everything in between

Tags:

rxjs

I'm trying to find an operator that behaves similarly to concatMap, but drops everything in between. E.g., concatMap does the following:

  • next a
  • start handling a
  • next b
  • next c
  • finish handling a
  • start handling b
  • finish handling b
  • start handling c
  • finish handling c

Instead, I'm looking for a mechanism that'll drop b, since c has already come in:

  • next a
  • start handling a
  • next b
  • next c
  • finish handling a
  • (skip b)
  • start handling c
  • finish handling c

See this gist for a more expanded example: https://gist.github.com/Burgov/afeada0d8aad58a9592aef6f3fc98543

like image 596
Bart van den Burg Avatar asked Dec 12 '18 08:12

Bart van den Burg


2 Answers

I think the operator you are looking for is throttle.

Here is a working Stackblitz. The key to making this work is setting the config object that is passed to throttle() which allows it to emit (and process) both the leading and trailing source emissions, but ignore any emissions in between, during the time processData() is running.

Here is the key function from the Stackblitz:

// Use 'from' to emit the above array one object at a time
const source$ = from(sourceData).pipe(

  // Simulate a delay of 'delay * delayInterval' between emissions
  concatMap(data => of(data).pipe(delay(data.delay * delayInterval))),

  // Now tap in order to show the emissions on the console.
  tap(data => console.log('next ', data.emit)),

  // Finally, throttle as long as 'processData' is handling the emission
  throttle(data => processData(data), { leading: true, trailing: true }),

).subscribe()

Short and sweet, and works as required except for one issue...

Update:

That "one issue" with the code above is that when the source observable completes, throttle() unsubscribes from processData, effectively stopping any final processing that needed to be done. The fix, as Bart van den Burg pointed out in the comments below, is to use a Subject. I figure there are many ways to do this, but the Stackblitz has been updated with the following code that now works:

// Set up a Subject to be the source of data so we can manually complete it
const source$ = new Subject();

// the data observable is set up just to emit as per the gist.
const dataSubscribe = from(sourceData).pipe(
    // Simulate a delay of 'delay * delayInterval' before the emission
    concatMap(data => of(data).pipe(delay(data.delay * delayInterval))),
).subscribe(data => {
    console.log('next ', data.emit); // log the emission to console
    source$.next(data); // Send this emission into the source
});

// Finally, subscribe to the source$ so we can process the data
const sourceSubscribe = source$.pipe(
    // throttle as long as 'processData' is handling the emission
    throttle(data => processData(data), { leading: true, trailing: true })
).subscribe(); // will need to manually unsubscribe later ...
like image 140
dmcgrandle Avatar answered Oct 01 '22 14:10

dmcgrandle


This is the easiest solution I was able to make:

const source = new Subject();
const start = new Date();
const mockDelayedObs = val => of(val).pipe(delay(1200));

source.pipe(
  multicast(
    new ReplaySubject(1),
    subject => {
      let lastValue;

      return subject.pipe(
        filter(v => v !== lastValue),
        exhaustMap(v => {
          lastValue = v;
          return mockDelayedObs(v);
        }),
        take(1),
        repeat(),
      );
    }
  ),
)
.subscribe(v => {
  console.log(new Date().getTime() - start.getTime(), v);
});

setTimeout(() => source.next(0), 0);
setTimeout(() => source.next(1), 500);
setTimeout(() => source.next(2), 1000);
setTimeout(() => source.next(3), 1500);
setTimeout(() => source.next(4), 1800);
setTimeout(() => source.next(5), 4000);

Live demo: https://stackblitz.com/edit/rxjs-z33jgp?devtoolsheight=60

The order of actions is like this this:

next 0
start handling 0
next 1
next 2
finish handling 0
start handling 2
next 3
next 4
finish handling 2
start handling 4
finish handling 4
start handling 5
finish handling 4

So only 0, 2, 4 and 5 will be printed

This would work without the multicast operator as well but I wanted to avoid leaking state variables. It seems like it's not entirely possible without them so there's just one lastValue. This variable is used only to ignore calling mockDelayedObs for the same value twice after resubscribing to the same chain with repeat().

like image 25
martin Avatar answered Oct 01 '22 12:10

martin