Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJS: An Observable that takes its own old value as input

I'm trying to implement something like this:

// This obviously doesn't work, because we try to refer to `a` before it exists.
// c and m are some Observables.
const aSampled = a.pipe(
  rxjs.operators.sample(c),
  rxjs.operators.startWith(aInitial)
);
const a = m.pipe(
  rxjs.operators.withLatestFrom(aSampled),
  map(([mValue, oldAValue]) => {
    // Do something.
  }),
  rxjs.operators.startWith(aInitial)
);

Now that's obivously uncompilable nonsense, since we try to sample a before creating it, but hopefully it makes my intent clear: Every emitted value from a should depend on one of the old values previously emitted by a. Which old value of a, that's decided by when was the last time that c emitted something. It's kinda like calling pairwise on a, except that I don't want the last two values, but the latest and another from further back.

Note that if not for the startWith(aInitial) bits, this wouldn't even be a well-defined question, since the first value emitted by a would be defined cyclically, refering to itself. However, as long as the first value a is specified separately, the construction makes mathematical sense. I just don't know how to implement it in code in a clean way. My hunch is that there would be a long-winded way of doing this by writing a custom Subject of some kind, but something more elegant would be very welcome.

To make this a bit more concrete, in my use case I'm dealing with a click-and-drag-to-pan type of UI element. m is an Observable of mousemove events, c is an Observable of mousedown events. a would then be something that keeps changing based where the cursor is, and what the value of a was when the click-down happened.

like image 595
user619051 Avatar asked Oct 27 '22 08:10

user619051


2 Answers

If I understand right, the basic streams of events are mousemove and a mousedown.

Based on these streams of events, you have to calculate a new stream a, which emits with the same frequency of mousemove but whose data are the result of some calculation based on the current position of the mouse and the value a had when the last mousedown emitted.

So, if this is true, we can simulate mousemove and mousedown with the following Observables

// the mouse is clicked every 1 second
const c = interval(1000).pipe(
    tap(cVal => console.log('click', cVal))
);
// the mouse moves diagonally and emits every 200 ms
const m = interval(200).pipe(
    map(mVal => [mVal, mVal]),
);

What we need is to have somehow at hand the value of the Observable a when mousedown emits. How can we get this?

Let's assume we have a BehaviourSubject called value_of_a whose initial value is 1 and that holds the value of a. If we had such Observable, we could get the its value when mousedown emits simply like this

const last_relevant_a = c.pipe(       // when mousedown emits
    switchMap(() => value_of_a.pipe(  // the control is switched to the Observable value_of_a
        take(1),                      // and we consider only its first value
    )),
);

With m, the mousemove Observable, and last_relevant_a we have all the Observable that we need. In fact we have just to combine their latest emissions to have all the elements we need to calculate the new value of a.

const a = combineLatest(m, last_relevant_a)
.submit(
   ([mouseVal, old_a_val] => {
      // do something to calculate the new value of a
   }
);

Now the only thing left to do is to make sure that value_of_a emits any value emitted by a. This can be accomplished calling next on value_of_a within the subscription of a itself.

Stitching it all together, the solution could be something like

const c = interval(1000).pipe(
    tap(cVal => console.log('click', cVal))
);

const m = interval(200).pipe(
    map(mVal => [mVal, mVal]),
);

const value_of_a = new BehaviorSubject<number>(1);

const last_relevant_a = c.pipe(
    switchMap(cVal => value_of_a.pipe(
        take(1),
    )),
);

const a = combineLatest(m, last_relevant_a);

a.pipe(
    take(20)
)
.subscribe(
    val => {
        // new value of a calculated with an arbitrary logic
        const new_value_of_a = val[0][0] * val[0][1] * val[1];
        // the new value of a is emitted by value_of_a
        value_of_a.next(new_value_of_a);
    }
)

Probably this is also a use case for the expand operator, but it should be investigated.

like image 120
Picci Avatar answered Nov 15 '22 13:11

Picci


You could create a new subject called a previous subject based off a behavior subject.

The source to a behavior subject is here https://github.com/ReactiveX/rxjs/blob/master/src/internal/BehaviorSubject.ts.

So lets create a previous subject that emits the current value along with the previous value.

import { Subject } from 'rxjs';
import { Subscriber } from 'rxjs';
import { Subscription } from 'rxjs';
import { SubscriptionLike } from 'rxjs';
import { ObjectUnsubscribedError } from 'rxjs';

export class PreviousSubject<T> extends Subject<T[]> {

  _value: T[];

  constructor(value: T, previous?: T) {
    super();
    this._value = [value, previous];
  }

  get value(): T[] {
    return this.getValue();
  }

  /** @deprecated This is an internal implementation detail, do not use. */
  _subscribe(subscriber: Subscriber<T[]>): Subscription {
    const subscription = super._subscribe(subscriber);
    if (subscription && !(<SubscriptionLike>subscription).closed) {
      subscriber.next(this._value);
    }
    return subscription;
  }

  getValue(): T[] {
    if (this.hasError) {
      throw this.thrownError;
    } else if (this.closed) {
      throw new ObjectUnsubscribedError();
    } else {
      return this._value;
    }
  }

  next(value: T): void {
    this._value = [value, this._value[0]];
    super.next(this._value);
  }
}

See a demo at https://stackblitz.com/edit/typescript-wiayqx

Or vanilla JS if you don't want TypeScript

const { Subject } = rxjs;

class PreviousSubject extends Subject {

  _value;

  constructor(value, previous) {
    super();
    this._value = [value, previous];
  }

  get value() {
    return this.getValue();
  }

  _subscribe(subscriber) {
    const subscription = super._subscribe(subscriber);
    if (subscription && !(subscription).closed) {
      subscriber.next(this._value);
    }
    return subscription;
  }

  getValue() {
    if (this.hasError) {
      throw this.thrownError;
    } else if (this.closed) {
      throw new ObjectUnsubscribedError();
    } else {
      return this._value;
    }
  }

  next(value) {
    this._value = [value, this._value[0]];
    super.next(this._value);
  }
}

let ps$ = new PreviousSubject('Start value');

ps$.subscribe(([current, previous]) => {
  console.log(`Current: ${current}, Previous: ${previous}`);
});

ps$.next('Value');
ps$.next('Another value');
ps$.next('Another value again');
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.3/rxjs.umd.min.js"></script>
like image 34
Adrian Brand Avatar answered Nov 15 '22 11:11

Adrian Brand