I am trying to use the RxJS groupBy
operator followed by concatMap
to collect records into individual groups based on some keys.
I have noticed that when concatMap
follows a groupBy
operator, it seems to lose the data for all the keys that occur after the first one.
For Example:
Consider the following code block:
// DOES NOT WORK
const records = ['a:1', 'b:2', 'c:3', 'd:1', 'e:2', 'f:3', 'g:1'];
const clicks = new Subject();
const result = clicks.pipe(
groupBy(x => x.substr(2,1)),
concatMap(ev$ => ev$.pipe(map(x => ({key: ev$.key, value: x})))),
);
const subscription = result.subscribe(x => console.log(x));
records.forEach(x => clicks.next(x));
// Expected Output:
// { key: '1', value: 'a:1' }
// { key: '1', value: 'd:1' }
// { key: '1', value: 'g:1' }
// { key: '2', value: 'b:2' }
// { key: '2', value: 'e:2' }
// { key: '3', value: 'c:3' }
// { key: '3', value: 'f:3' }
//
// Actual Output:
// { key: '1', value: 'a:1' }
// { key: '1', value: 'd:1' }
// { key: '1', value: 'g:1' }
// ...Nothing more -- no results for key 2 and 3
However, when I use the concatMap
operator on its own, it behaves as expected.
// WORKS
const records = ['a', 'b', 'c', 'd', 'e', 'f', 'g'];
const clicks = new Subject();
const result = clicks.pipe(
concatMap(ev => ev.subject$.pipe(take(4), map(x => ev.key + x))),
);
const subscription = result.subscribe(x => console.log(x));
records.forEach(x => clicks.next({key: x, subject$: interval(1000)}));
// Expected & Actual Output:
// a0
// a1
// a2
// a3
// b0
// b1
// b2
// b3
// c0
// c1
// c2
// c3
// d0
// d1
// d2
// d3
// e0
// e1
// e2
// e3
// f0
// f1
// f2
// f3
// g0
// g1
// g2
// g3
Reading through the documentation for RxJS groupBy
and concatMap
does not provide me with any clues as to what could be going on here. Whereas the section on RxJS concatMap
at reactivex.io leads me to believe that this should work.
Can anyone help me understand what's going on with the first scenario here? How can I get the first scenario to work?
I finally seem to have figured out what the issue is here.
In Scenario #1 in the question above, the code pipes the source stream first into the groupBy
operator, followed by the concatMap
operator. And this combination of operators seem to be causing this problem.
groupBy
and mergeMap
Reading through the code for the groupBy
operator, I realized that groupBy
internally creates a new Subject
instance for each key that is found in the source stream. All values belonging to that key are then immediately emitted by that Subject
instance.
All the Subject
instances are wrapped into GroupedObservale
s and emitted downstream by the groupBy
operator. This stream of GroupedObservable
instances is the input to the concatMap
operator.
The concatMap
operator internally calls the mergeMap
operator with a value of 1 for concurrency
, which means only one source observable is subscribed to concurrently.
The mergeMap
operator subscribes to only one observable, or as many observables as is allowed by the conccurency
parameter, and holds all other observables in "buffer" till the first one has completed.
Firstly, now that I have read through the code for these operators, I am not too sure if this is a "problem".
Nevertheless the behavior I described in the question occurs because while the groupBy
operator emits individual values using the corresponding Subject
instance immediately, the mergeMap
operator would not have subscribed to that particular Subject
. Hence, all values from the source stream that are being emitted using that Subject
are lost.
I have tried to illustrate this problem in a rough marble diagram:
This is not a "problem" with the way these operators work, but perhaps with the way I understood these operators and possibly the documentation (particularly the documentation for concatMap
which maybe a bit confusing for the folks new to RxJS).
This can be easily fixed by getting the groupBy
operator to use a ReplaySubject
instead of Subject
to emit the grouped values. The groupBy
accepts a subjectSelector
parameter that allows us to switch the Subject
instance with a ReplaySubject
instance.
The following code works:
// THIS VERSION WORKS
const records = ['a:1', 'b:2', 'c:3', 'd:1', 'e:2', 'f:3', 'g:1'];
const clicks = new Subject();
const result = clicks.pipe(
groupBy(x => x.substr(2,1), null, null, () => new ReplaySubject()),
concatMap(ev$ => ev$.pipe(map(x => ({key: ev$.key, value: x})))),
);
const subscription = result.subscribe(x => console.log(x));
records.forEach(x => clicks.next(x));
// We also need to explicity complete() the source
// stream to ensure that the observable stream for
// the first GroupedObservable completes allowing
// the concatMap operator to move to the second
// GroupedObservable.
clicks.complete();
// Expected and Actual output
// { key: '1', value: 'a:1' }
// { key: '1', value: 'd:1' }
// { key: '1', value: 'g:1' }
// { key: '2', value: 'b:2' }
// { key: '2', value: 'e:2' }
// { key: '3', value: 'c:3' }
// { key: '3', value: 'f:3' }
Scenario 2 in my question works fine because interval
just creates an Observable but does not start emitting values. Hence, all values from that Observable are available when mergeMap
finally subscribes to it.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With