Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJS: Observable stream piped to groupBy() followed by concatMap(); data for subsequent keys lost

Tags:

rxjs6

I am trying to use the RxJS groupBy operator followed by concatMap to collect records into individual groups based on some keys.

I have noticed that when concatMap follows a groupBy operator, it seems to lose the data for all the keys that occur after the first one.

For Example:

Consider the following code block:

// DOES NOT WORK
const records = ['a:1', 'b:2', 'c:3', 'd:1', 'e:2', 'f:3', 'g:1'];
const clicks = new Subject();

const result = clicks.pipe(
  groupBy(x => x.substr(2,1)),
  concatMap(ev$ => ev$.pipe(map(x => ({key: ev$.key, value: x})))),
);
const subscription = result.subscribe(x => console.log(x));

records.forEach(x => clicks.next(x));

// Expected Output:
// { key: '1', value: 'a:1' }
// { key: '1', value: 'd:1' }
// { key: '1', value: 'g:1' }
// { key: '2', value: 'b:2' }
// { key: '2', value: 'e:2' }
// { key: '3', value: 'c:3' }
// { key: '3', value: 'f:3' }
// 
// Actual Output:
// { key: '1', value: 'a:1' }
// { key: '1', value: 'd:1' }
// { key: '1', value: 'g:1' }
// ...Nothing more -- no results for key 2 and 3

However, when I use the concatMap operator on its own, it behaves as expected.

// WORKS
const records = ['a', 'b', 'c', 'd', 'e', 'f', 'g'];
const clicks = new Subject();

const result = clicks.pipe(
  concatMap(ev => ev.subject$.pipe(take(4), map(x => ev.key + x))),
);
const subscription = result.subscribe(x => console.log(x));

records.forEach(x => clicks.next({key: x, subject$: interval(1000)}));

// Expected & Actual Output:
// a0
// a1
// a2
// a3
// b0
// b1
// b2
// b3
// c0
// c1
// c2
// c3
// d0
// d1
// d2
// d3
// e0
// e1
// e2
// e3
// f0
// f1
// f2
// f3
// g0
// g1
// g2
// g3

Reading through the documentation for RxJS groupBy and concatMap does not provide me with any clues as to what could be going on here. Whereas the section on RxJS concatMap at reactivex.io leads me to believe that this should work.

Can anyone help me understand what's going on with the first scenario here? How can I get the first scenario to work?

like image 351
Kiran Avatar asked Sep 28 '18 20:09

Kiran


1 Answers

I finally seem to have figured out what the issue is here.

In Scenario #1 in the question above, the code pipes the source stream first into the groupBy operator, followed by the concatMap operator. And this combination of operators seem to be causing this problem.

Inner workings of groupBy and mergeMap

Reading through the code for the groupBy operator, I realized that groupBy internally creates a new Subject instance for each key that is found in the source stream. All values belonging to that key are then immediately emitted by that Subject instance.

All the Subject instances are wrapped into GroupedObservales and emitted downstream by the groupBy operator. This stream of GroupedObservable instances is the input to the concatMap operator.

The concatMap operator internally calls the mergeMap operator with a value of 1 for concurrency, which means only one source observable is subscribed to concurrently.

The mergeMap operator subscribes to only one observable, or as many observables as is allowed by the conccurency parameter, and holds all other observables in "buffer" till the first one has completed.

How does this create the problem?

Firstly, now that I have read through the code for these operators, I am not too sure if this is a "problem".

Nevertheless the behavior I described in the question occurs because while the groupBy operator emits individual values using the corresponding Subject instance immediately, the mergeMap operator would not have subscribed to that particular Subject. Hence, all values from the source stream that are being emitted using that Subject are lost.

I have tried to illustrate this problem in a rough marble diagram: groupBy to concatMap issue

This is not a "problem" with the way these operators work, but perhaps with the way I understood these operators and possibly the documentation (particularly the documentation for concatMap which maybe a bit confusing for the folks new to RxJS).

This can be easily fixed by getting the groupBy operator to use a ReplaySubject instead of Subject to emit the grouped values. The groupBy accepts a subjectSelector parameter that allows us to switch the Subject instance with a ReplaySubject instance.

The following code works:

// THIS VERSION WORKS
const records = ['a:1', 'b:2', 'c:3', 'd:1', 'e:2', 'f:3', 'g:1'];
const clicks = new Subject();

const result = clicks.pipe(
  groupBy(x => x.substr(2,1), null, null, () => new ReplaySubject()),
  concatMap(ev$ => ev$.pipe(map(x => ({key: ev$.key, value: x})))),
);
const subscription = result.subscribe(x => console.log(x));

records.forEach(x => clicks.next(x));

// We also need to explicity complete() the source
// stream to ensure that the observable stream for
// the first GroupedObservable completes allowing
// the concatMap operator to move to the second
// GroupedObservable.
clicks.complete();

// Expected and Actual output
// { key: '1', value: 'a:1' }
// { key: '1', value: 'd:1' }
// { key: '1', value: 'g:1' }
// { key: '2', value: 'b:2' }
// { key: '2', value: 'e:2' }
// { key: '3', value: 'c:3' }
// { key: '3', value: 'f:3' }

Why does Scenario 2 work?

Scenario 2 in my question works fine because interval just creates an Observable but does not start emitting values. Hence, all values from that Observable are available when mergeMap finally subscribes to it.

like image 176
Kiran Avatar answered Sep 17 '22 04:09

Kiran