I'm trying to implement something like this:
// This obviously doesn't work, because we try to refer to `a` before it exists.
// c and m are some Observables.
const aSampled = a.pipe(
rxjs.operators.sample(c),
rxjs.operators.startWith(aInitial)
);
const a = m.pipe(
rxjs.operators.withLatestFrom(aSampled),
map(([mValue, oldAValue]) => {
// Do something.
}),
rxjs.operators.startWith(aInitial)
);
Now that's obivously uncompilable nonsense, since we try to sample
a
before creating it, but hopefully it makes my intent clear: Every emitted value from a
should depend on one of the old values previously emitted by a
. Which old value of a
, that's decided by when was the last time that c
emitted something. It's kinda like calling pairwise
on a
, except that I don't want the last two values, but the latest and another from further back.
Note that if not for the startWith(aInitial)
bits, this wouldn't even be a well-defined question, since the first value emitted by a
would be defined cyclically, refering to itself. However, as long as the first value a
is specified separately, the construction makes mathematical sense. I just don't know how to implement it in code in a clean way. My hunch is that there would be a long-winded way of doing this by writing a custom Subject of some kind, but something more elegant would be very welcome.
To make this a bit more concrete, in my use case I'm dealing with a click-and-drag-to-pan type of UI element. m
is an Observable of mousemove
events, c
is an Observable of mousedown
events. a
would then be something that keeps changing based where the cursor is, and what the value of a
was when the click-down happened.
If I understand right, the basic streams of events are mousemove
and a mousedown
.
Based on these streams of events, you have to calculate a new stream a
, which emits with the same frequency of mousemove
but whose data are the result of some calculation based on the current position of the mouse and the value a
had when the last mousedown
emitted.
So, if this is true, we can simulate mousemove
and mousedown
with the following Observables
// the mouse is clicked every 1 second
const c = interval(1000).pipe(
tap(cVal => console.log('click', cVal))
);
// the mouse moves diagonally and emits every 200 ms
const m = interval(200).pipe(
map(mVal => [mVal, mVal]),
);
What we need is to have somehow at hand the value of the Observable a
when mousedown
emits. How can we get this?
Let's assume we have a BehaviourSubject
called value_of_a
whose initial value is 1
and that holds the value of a
. If we had such Observable, we could get the its value when mousedown
emits simply like this
const last_relevant_a = c.pipe( // when mousedown emits
switchMap(() => value_of_a.pipe( // the control is switched to the Observable value_of_a
take(1), // and we consider only its first value
)),
);
With m
, the mousemove Observable, and last_relevant_a
we have all the Observable that we need. In fact we have just to combine their latest emissions to have all the elements we need to calculate the new value of a
.
const a = combineLatest(m, last_relevant_a)
.submit(
([mouseVal, old_a_val] => {
// do something to calculate the new value of a
}
);
Now the only thing left to do is to make sure that value_of_a
emits any value emitted by a
. This can be accomplished calling next
on value_of_a
within the subscription of a
itself.
Stitching it all together, the solution could be something like
const c = interval(1000).pipe(
tap(cVal => console.log('click', cVal))
);
const m = interval(200).pipe(
map(mVal => [mVal, mVal]),
);
const value_of_a = new BehaviorSubject<number>(1);
const last_relevant_a = c.pipe(
switchMap(cVal => value_of_a.pipe(
take(1),
)),
);
const a = combineLatest(m, last_relevant_a);
a.pipe(
take(20)
)
.subscribe(
val => {
// new value of a calculated with an arbitrary logic
const new_value_of_a = val[0][0] * val[0][1] * val[1];
// the new value of a is emitted by value_of_a
value_of_a.next(new_value_of_a);
}
)
Probably this is also a use case for the expand
operator, but it should be investigated.
You could create a new subject called a previous subject based off a behavior subject.
The source to a behavior subject is here https://github.com/ReactiveX/rxjs/blob/master/src/internal/BehaviorSubject.ts.
So lets create a previous subject that emits the current value along with the previous value.
import { Subject } from 'rxjs';
import { Subscriber } from 'rxjs';
import { Subscription } from 'rxjs';
import { SubscriptionLike } from 'rxjs';
import { ObjectUnsubscribedError } from 'rxjs';
export class PreviousSubject<T> extends Subject<T[]> {
_value: T[];
constructor(value: T, previous?: T) {
super();
this._value = [value, previous];
}
get value(): T[] {
return this.getValue();
}
/** @deprecated This is an internal implementation detail, do not use. */
_subscribe(subscriber: Subscriber<T[]>): Subscription {
const subscription = super._subscribe(subscriber);
if (subscription && !(<SubscriptionLike>subscription).closed) {
subscriber.next(this._value);
}
return subscription;
}
getValue(): T[] {
if (this.hasError) {
throw this.thrownError;
} else if (this.closed) {
throw new ObjectUnsubscribedError();
} else {
return this._value;
}
}
next(value: T): void {
this._value = [value, this._value[0]];
super.next(this._value);
}
}
See a demo at https://stackblitz.com/edit/typescript-wiayqx
Or vanilla JS if you don't want TypeScript
const { Subject } = rxjs;
class PreviousSubject extends Subject {
_value;
constructor(value, previous) {
super();
this._value = [value, previous];
}
get value() {
return this.getValue();
}
_subscribe(subscriber) {
const subscription = super._subscribe(subscriber);
if (subscription && !(subscription).closed) {
subscriber.next(this._value);
}
return subscription;
}
getValue() {
if (this.hasError) {
throw this.thrownError;
} else if (this.closed) {
throw new ObjectUnsubscribedError();
} else {
return this._value;
}
}
next(value) {
this._value = [value, this._value[0]];
super.next(this._value);
}
}
let ps$ = new PreviousSubject('Start value');
ps$.subscribe(([current, previous]) => {
console.log(`Current: ${current}, Previous: ${previous}`);
});
ps$.next('Value');
ps$.next('Another value');
ps$.next('Another value again');
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.3/rxjs.umd.min.js"></script>
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With