Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Operator that skips the next emission from the source whenever another Observable emits

I have a use case where I need an Observable to skip its next emission whenever another notifier Observable emits.

source:    |---X---X---X---X---X---X---X---X---X---X--|>
notifier:  |-------------X---------X----------X-------|>
result:    |---X---X---X-------X---X-------X-------X--|>

Basically, I want an operator called skipNextWhen that takes in the notifier observable and skips the next emission from the source.

I tried using an implementation that uses the pausable operator (re-implemented using switchMap), but couldn't get it to work.

pausable.ts

import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import 'rxjs/add/observable/never';
import 'rxjs/add/operator/startWith';

declare module 'rxjs/Observable' {
    interface Observable<T> {
        pausable: typeof pausable;
    }
}

function pausable<T>(notifier: Observable<boolean>): Observable<T> {
    return notifier.startWith(false).switchMap((paused) => {
        if (paused) {
            return Observable.never();
        } else {
            const source = new Subject();
            this.subscribe(source);
            return source;
        }
    });
}

Observable.prototype.pausable = pausable;

skipNextWhen.ts

import { Observable } from 'rxjs/Observable';
import './pausable';

declare module 'rxjs/Observable' {
    interface Observable<T> {
        skipNextWhen: typeof skipNextWhen;
    }
}

function skipNextWhen<T, R>(other: Observable<T>): Observable<R> {
    const notifier = Observable.merge(this.map(() => false), 
                                      other.map(() => true));
    return this.pausable(notifier);
}

Observable.prototype.skipNextWhen = skipNextWhen;

Is there a more suitable operator that I should consider using instead? The behavior I'm seeing with my current implementation is that the result Observable emits once, and then never again - even if the notifier Observable never emits.

like image 795
Brendon Roberto Avatar asked Jul 03 '17 14:07

Brendon Roberto


2 Answers

I've started a (very) small library of some rxjs utils I've wanted. It happens to have a function to do exactly what you ask: skipAfter. From the docs:

source: -1-----2-----3-----4-----5-|
skip$:  ----0----------0-0----------

result: -1-----------3-----------5-|

The library is here: https://github.com/simontonsoftware/s-rxjs-utils

like image 144
Eric Simonton Avatar answered Oct 19 '22 13:10

Eric Simonton


I can think of two solutions to this:

  1. Using .filter(), .do() and a few side-effects.

    This is mayne easier to understand solution even though it's not that "Rx" way:

    function skipNextWhen(other) {
        let skipNext = false;
    
        return this.merge(other.do(() => skipNext = true).filter(() => false))
            .filter(val => {
                const doSkip = skipNext;
                skipNext = false;
                return !doSkip;
            });
    }
    

    I'm using merge() just to update skipNext, other's value is always ignored.

  2. Using .scan():

    This solution is without any state variables and side-effects.

    function skipNextWhen(other) {
        const SKIP = 'skip';
    
        return this.merge(other.mapTo(SKIP))
            .scan((acc, val) => {
                if (acc === SKIP) {
                    return null;
                } else if (val === SKIP) {
                    return SKIP;
                } else {
                    return val;
                }
            }, [])
            .filter(val => Boolean(val) && val !== SKIP);
    }
    

    Basically, when SKIP arrives I return it right away because it's going to be passed again in acc parameter by the scan() operator and later ignored by filter().

    If I receive a normal value but the previous value was SKIP I ignore it and return just null which is later filter away.

Both solutions give the same result:

Observable.prototype.skipNextWhen = skipNextWhen;

const source = Observable.range(1, 10)
    .concatMap(val => Observable.of(val).delay(100));

source
    .skipNextWhen(Observable.interval(350))
    .subscribe(console.log);

This prints the following:

1
2
3
5
6
8
9
10

Just be aware that you're not in fact creating new operator. You just have a shortcut for an operator chain. This for example doesn't let you unsubscribe from other when the source completes.

like image 43
martin Avatar answered Oct 19 '22 12:10

martin