Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Operator similar to exhaustMap but that remembers the last skipped value from the source and executes it in the end

Tags:

rxjs

reactivex

I need an operator similar to exahustMap, but which remembers the last skipped observable and executes it after the current observable completes.

For example, considering the marble diagram for exhaustMap:

exhaustMap marble diagram

In my case, after the blue values are emitted, it will be followed by three values of 50. Of course, in this case it looks just like concatMap, but if there were also a 4 between 3 and 5, it would not reflect in the output.

I've managed to write my own operator similar to how exhaustMap is implemented:

function exhaustLatestMap<T, R>(project: (value: T) => Subscribable<R>): OperatorFunction<T, R> {
    return source => new Observable<R>(observer => 
        source.subscribe(new ExhaustLatestMapOperatorSubscriber(observer, project)));
}

class ExhaustLatestMapOperatorSubscriber<T, R> implements Observer<T> {

    constructor(
        private observer: Subscriber<R>,
        private project: (value: T) => Subscribable<R>) { }

    innerSub: AnonymousSubscription = null;
    latestValue: T;

    next(value: T) {
        this.processNext(value);
    }

    error(err) {
        this.observer.error(err);
    }

    complete() {
        this.observer.complete();
    }

    private processNext(value: T) {
        this.latestValue = value;
        if (!this.innerSub) {
            this.innerSub = this.project(value).subscribe({
                next: v => this.observer.next(v),
                error: err => {
                    this.observer.error(err);
                    this.endInnerSub(value)
                },
                complete: () => {
                    this.endInnerSub(value);
                }
            });
        }
    }

    private endInnerSub(value: T) {
        this.innerSub.unsubscribe();
        this.innerSub = null;
        if (this.latestValue !== value) {
            this.processNext(this.latestValue);
        }
    }
}

But I was wondering if there is a way to implement it just by reusing and combining existing operators. Any ideas?

like image 384
Andrei Vajna II Avatar asked Oct 17 '22 14:10

Andrei Vajna II


1 Answers

It is possible to implement it using just the built-in factories and operators. However, AFAICT, it cannot be done without managing some per-subscription state.

Fortunately, the defer factory function makes managing per-subscription state relatively straightforward and safe. And, in addition to helping manage per-subscription state, defer can be used as a mechanism for being notified of when an observable is subscribed to.

An alternative implementation:

const {
  concat,
  defer,
  EMPTY,
  merge,
  of
} = rxjs;

const {
  delay,
  mergeMap,
  tap
} = rxjs.operators;

const exhaustMapLatest = project => source => defer(() => {
  let latestValue;
  let hasLatestValue = false;
  let isExhausting = false;
  const next = value => defer(() => {
    if (isExhausting) {
      latestValue = value;
      hasLatestValue = true;
      return EMPTY;
    }
    hasLatestValue = false;
    isExhausting = true;
    return project(value).pipe(
      tap({ complete: () => isExhausting = false }),
      s => concat(s, defer(() => hasLatestValue ?
        next(latestValue) :
        EMPTY
      ))
    );
  });
  return source.pipe(mergeMap(next));
});

const source = merge(
  of(0).pipe(delay(0)),
  of(1000).pipe(delay(1000)),
  of(1100).pipe(delay(1100)),
  of(1200).pipe(delay(1200)),
  of(2000).pipe(delay(2000))
);

source.pipe(
  exhaustMapLatest(value => merge(
    of(`${value}:0`).pipe(delay(0)),
    of(`${value}:150`).pipe(delay(150)),
    of(`${value}:300`).pipe(delay(300))
  ))
).subscribe(value => console.log(value));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>

There are a couple of behavioural differences between this implementation and yours:

  • This implementation uses a hasLatestValue flag rather than an equality check, so if the latest value is equal to the initial value, it is still projected.
  • With this implementation, if a subscriber to the resultant observable unsubscribes, any subscription to a projected observable will be unsubscribed, too. With your implementation, the inner subscription will remain subscribed - AFAICT - until the projected observable completes or errors.

I'm not advocating that it should be implemented this way. The answer is just to show an alternative implementation.

like image 63
cartant Avatar answered Oct 21 '22 00:10

cartant