Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJS Custom Operator Internal Variables

Are there drawbacks to using/mutating a variable from a custom operator closure in RxJS? I realize it violates the "pure" function principle and that you can use scan for this simple example, but I'm asking specifically for tangible technical issues with underlying pattern below:

const custom = () => {

  let state = 0; 

  return pipe(
    map(next => state * next),
    tap(_ => state += 1),
    share()
  )
}

// Usage
const obs = interval(1000).pipe(custom())

obs.subscribe()
like image 771
JeffD23 Avatar asked Aug 25 '18 18:08

JeffD23


2 Answers

There are at least two problems with the way you've stored state within your custom operator.

The first problem is that your doing so means the operator is no longer referentially transparent. That is, if the calling of the operator is replaced with the operator's return value, the behaviour is different:

const { pipe, range } = rxjs;
const { map, share, tap } = rxjs.operators;

const custom = () => {
  let state = 0; 
  return pipe(
    map(next => state * next),
    tap(_ => state += 1),
    share()
  );
};

const op = custom();
console.log("first use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));
console.log("second use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>

The second problem - as mentioned in the other answer - is that different subscriptions will receive different values in their next notifications, as the state within the operator is shared.

For example, if the source observable is synchronous, consecutive subscriptions will see different values:

const { pipe, range } = rxjs;
const { map, share, tap } = rxjs.operators;

const custom = () => {
  let state = 0; 
  return pipe(
    map(next => state * next),
    tap(_ => state += 1),
    share()
  );
};

const source = range(1, 2).pipe(custom());
console.log("first subscription:");
source.subscribe(n  => console.log(n));
console.log("second subscription:");
source.subscribe(n => console.log(n));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>

However, it is possible to write an operator very similar to your custom operator and have it behave correctly in all circumstances. To do so, it's necessary to ensure that any state within the operator is per-subscription.

A pipeable operator is just a function that takes an observable and returns an observable, so you could use defer to ensure that your state is per-subscription, like this:

const { defer, pipe, range } = rxjs;
const { map, share, tap } = rxjs.operators;

const custom = () => {
  return source => defer(() => {
    let state = 0; 
    return source.pipe(
      map(next => state * next),
      tap(_ => state += 1)
    );
  }).pipe(share());
};

const op = custom();
console.log("first use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));
console.log("second use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));

const source = range(1, 2).pipe(op);
console.log("first subscription:");
source.subscribe(n => console.log(n));
console.log("second subscription:");
source.subscribe(n => console.log(n));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>
like image 80
cartant Avatar answered Oct 23 '22 14:10

cartant


As you already stated, you lose some of the advantages of pure functions. In this particular case, you run the risk of late subscribers getting different streams of data than you may expect (depends on what you are doing in your real case vs. in this constructed one).

For instance, by adding late subscribers, stream 'A' would see 0 and 1. Stream 'B' would see only '1' (it skips 0 because obs is still active from the 'A' subscriber. Stream 'C' would behave like stream 'A'.

const { interval, pipe, subscribe } = Rx;
const { take, map, tap, share  } = RxOperators;

const custom = () => {
  let state = 0; 
  return pipe(
    map(next => state * next),
    tap(_ => state += 1),
    share()
  )
}

// Late subscribers can get different streams
const obs = interval(500).pipe(custom())
const sub1 = obs.pipe(take(2)).subscribe((x) => console.log('A', x))
setTimeout(() => obs.pipe(take(1)).subscribe((x) => console.log('B', x)), 500)
setTimeout(() => obs.pipe(take(3)).subscribe((x) => console.log('C', x)), 3000)

Whether this is acceptable or expected behavior will depend on your use case. While it is good to try and use pure functions for all of their advantages, sometimes it isn't practical or appropriate for your use case.

like image 4
rfestag Avatar answered Oct 23 '22 13:10

rfestag