Are there drawbacks to using/mutating a variable from a custom operator closure in RxJS? I realize it violates the "pure" function principle and that you can use scan
for this simple example, but I'm asking specifically for tangible technical issues with underlying pattern below:
const custom = () => {
let state = 0;
return pipe(
map(next => state * next),
tap(_ => state += 1),
share()
)
}
// Usage
const obs = interval(1000).pipe(custom())
obs.subscribe()
There are at least two problems with the way you've stored state within your custom
operator.
The first problem is that your doing so means the operator is no longer referentially transparent. That is, if the calling of the operator is replaced with the operator's return value, the behaviour is different:
const { pipe, range } = rxjs;
const { map, share, tap } = rxjs.operators;
const custom = () => {
let state = 0;
return pipe(
map(next => state * next),
tap(_ => state += 1),
share()
);
};
const op = custom();
console.log("first use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));
console.log("second use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>
The second problem - as mentioned in the other answer - is that different subscriptions will receive different values in their next
notifications, as the state within the operator is shared.
For example, if the source observable is synchronous, consecutive subscriptions will see different values:
const { pipe, range } = rxjs;
const { map, share, tap } = rxjs.operators;
const custom = () => {
let state = 0;
return pipe(
map(next => state * next),
tap(_ => state += 1),
share()
);
};
const source = range(1, 2).pipe(custom());
console.log("first subscription:");
source.subscribe(n => console.log(n));
console.log("second subscription:");
source.subscribe(n => console.log(n));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>
However, it is possible to write an operator very similar to your custom
operator and have it behave correctly in all circumstances. To do so, it's necessary to ensure that any state within the operator is per-subscription.
A pipeable operator is just a function that takes an observable and returns an observable, so you could use defer
to ensure that your state is per-subscription, like this:
const { defer, pipe, range } = rxjs;
const { map, share, tap } = rxjs.operators;
const custom = () => {
return source => defer(() => {
let state = 0;
return source.pipe(
map(next => state * next),
tap(_ => state += 1)
);
}).pipe(share());
};
const op = custom();
console.log("first use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));
console.log("second use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));
const source = range(1, 2).pipe(op);
console.log("first subscription:");
source.subscribe(n => console.log(n));
console.log("second subscription:");
source.subscribe(n => console.log(n));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>
As you already stated, you lose some of the advantages of pure functions. In this particular case, you run the risk of late subscribers getting different streams of data than you may expect (depends on what you are doing in your real case vs. in this constructed one).
For instance, by adding late subscribers, stream 'A' would see 0 and 1. Stream 'B' would see only '1' (it skips 0 because obs is still active from the 'A' subscriber. Stream 'C' would behave like stream 'A'.
const { interval, pipe, subscribe } = Rx;
const { take, map, tap, share } = RxOperators;
const custom = () => {
let state = 0;
return pipe(
map(next => state * next),
tap(_ => state += 1),
share()
)
}
// Late subscribers can get different streams
const obs = interval(500).pipe(custom())
const sub1 = obs.pipe(take(2)).subscribe((x) => console.log('A', x))
setTimeout(() => obs.pipe(take(1)).subscribe((x) => console.log('B', x)), 500)
setTimeout(() => obs.pipe(take(3)).subscribe((x) => console.log('C', x)), 3000)
Whether this is acceptable or expected behavior will depend on your use case. While it is good to try and use pure functions for all of their advantages, sometimes it isn't practical or appropriate for your use case.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With