Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJS throttle same value but let new values through

"Here you have", someone says and you are given this input stream of values that you somewhat want to do distinctUntilChanged() upon...

Input:  '1-1----11---2--1122----1---2---2-2-1-2---|'
Output: '1-----------2--1-2-----1---2-------1-2---|'

Nothing weird so far,
But now someone says "it's okey" if the same value comes again, "but only if it's not to soon!". I want at least '----' ticks between the same value. "Okey" you say and you add a throttle

const source = new Subject<number>();

// mysterious cave troll is randomly source.next(oneOrTwo)

const example = source.pipe(throttle(val => interval(4000)));

Input:  '1-1----11---2--1122----1---2---2-2-1-2-----|'
Output: '1------1----2----2-----1-------2-----2-----|'

"That's not what I want! Look at all the value you missed", referring to that you throttle in regards to all values being streamed.

Input:  '1-1----11---2--1122----1---2---2-2-1-2-----|'
Output: '1------1----2----2-----1-------2-----2-----|'
        '-------------->1<--------->2<----->1<------|' <-- Missed values

"Here, let me show show you" the mysterious man says and gives you this

Wanted output

Input:  '1-1----11---2--1112----1---2---2-2-1-2-----|'
Output: '1------1----2--1--2----1---2-----2-1-------|'

My answer to this is that it feels like a combined window wouldn't do.

From someone more experienced,
is this a hard problem to solve? (or have I missed an obvious solution)

like image 527
erhise Avatar asked Dec 05 '18 00:12

erhise


1 Answers

First I came up with idea to somehow combine distinctUntilChanged() and throttleTimte(), however it was not possible for me to come up with solution and then I tried something else.

The operator I came up with is throttleDistinct() that works as you would like to: StackBlit Editor Link

It has 2 parameters which are:

  1. duration: number which is in milliseconds and is similar to duration in throttleTime(duration: number)
  2. equals: (a: T, b: T) => boolean which is function to compare if previous item is equal to next item, which has default implementation of (a, b) => a === b

import { of, fromEvent, interval, Observable } from 'rxjs';
import { map, scan, filter, } from 'rxjs/operators';

const source = fromEvent(document, 'keypress')
  .pipe(map((x: any) => x.keyCode as number))

source
  .pipe(
    throttleDistinct(1000),
  )
  .subscribe((x) => console.log('__subscribe__', x));

export function throttleDistinct<T>(
  duration: number,
  equals: (a: T, b: T) => boolean = (a, b) => a === b
) {
  return (source: Observable<T>) => {
    return source
      .pipe(
        map((x) => {
          const obj = { val: x, time: Date.now(), keep: true };
          return obj;
        }),
        scan((acc, cur) => {
          const diff = cur.time - acc.time;

          const isSame = equals(acc.val, cur.val)
          return diff > duration || (diff < duration && !isSame)
            ? { ...cur, keep: true }
            : { ...acc, keep: false };
        }),
        filter((x) => x.keep),
        map((x) => x.val),
      )
  }
}
like image 179
Goga Koreli Avatar answered Oct 20 '22 00:10

Goga Koreli