Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can the RxJs operator groupBy leak memory?

I'm trying to wrap my head around the use cases for the RxJs operator groupBy and I'm concerned that in certain instances it may lead to a memory leak.

I'm familiar with groupBy in the tradition sense (synchronous list processing for example). I'm going to write out a groupBy function to make reference to:

const groupBy = f => list =>
  list.reduce((grouped, item) => {
    const category = f(item);
    if (!(category in grouped)) {
      grouped[category] = [];
    }
    grouped[category].push(item);
    return grouped;
  }, {});

const oddsAndEvens = x => x % 2 === 0 ? 'EVEN' : 'ODD';

compose(
  console.log,
  groupBy(oddsAndEvens)
)([1,2,3,4,5,6,7,8])

// returns: { ODD: [ 1, 3, 5, 7 ], EVEN: [ 2, 4, 6, 8 ] }

Note that this is stateless in the broader scope. I'm assuming that RxJs does something similar to this where in the place of EVEN and ODD there would be returned observables, and that it keeps track of the groups statefully in something that behaves like a set. Correct me if I'm wrong, the main point is that I think RxJs would have to maintain a stateful list of all groupings.

My question is, what happens if the number of grouping values (just EVEN and ODD in this example) are not finite? For example, a stream that gives you a unique identifier to maintain coherence over the life of the stream. If you were to group by this identifier would RxJs's groupBy operator keep making more and more groups even tho old identifiers will never be revisited again?

like image 502
MFave Avatar asked Dec 24 '22 08:12

MFave


1 Answers

If your stream is infinite and your Key Selector can produce infinite groups, then - yes, you have a memory leak.

You can set a Duration Selector for every grouped observable. The Duration Selector is created for each group and signals on the expiration of the group.

rxjs 5+: groupBy 3rd parameter.

rxjs 4: use the groupedByUntil operator instead.

Here is an example of an infinite stream, where each of the grouped Observables is closed after 3 seconds.

Rx.Observable.interval(200)
  .groupBy(
    x => Math.floor(x / 10),
    x => x,
    x$ => Rx.Observable.timer(3000).finally(() => console.log(`closing group ${x$.key}`))
  )
  .mergeMap(x$ => x$.map(x => `group ${x$.key}: ${x}`))
  .subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.8/Rx.js"></script>
like image 192
ZahiC Avatar answered Dec 25 '22 23:12

ZahiC