Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJS: Elegant way partition source Observable into 3 or more Observables

I have a socket connection that emits messages with an identifier. I would like to create a separate observable for each type of message. I have a few solutions but they all feel clunky or have possible performance issues. I'm relatively new to RxJS so I'm not aware of the possible traps I might be walking into.

My first instinct was to create a filtered observable for each type:

const receive_message = Rx.fromEvent(socket, 'data').pipe(share());
const message_type_a = receive_message.pipe(filter(message => message.type === 'a'));
const message_type_b = receive_message.pipe(filter(message => message.type === 'b'));
const message_type_c = receive_message.pipe(filter(message => message.type === 'c'));
const message_type_d = receive_message.pipe(filter(message => message.type === 'd'));

I think this would cause performance issues because it's performing this check for every message type every time any message comes in.

I thought about doing a multistage partition like this:

const receive_message = Rx.fromEvent(socket, 'data');
const [message_type_a, not_a] = receive_message.pipe(partition(message => message.type === 'a'));
const [message_type_b, not_b] = not_a.pipe(partition(message => message.type === 'b'));
const [message_type_c, message_type_d] = not_b.pipe(partition(message => message.type === 'c'));

This is awfully clunky and I'm not sure if it is any more performant than the filter solution.

Next I tried using subjects like so:

const message_type_a = new Rx.Subject();
const message_type_b = new Rx.Subject();
const message_type_c = new Rx.Subject();
const message_type_d = new Rx.Subject();

Rx.fromEvent(socket, 'data').subscribe(function (message) {
    switch (message.type) {
      case 'a':
        message_type_a.next(message);
        break;
      case 'b':
        message_type_b.next(message);
        break;
      case 'c':
        message_type_c.next(message);
        break;
      case 'd':
        message_type_d.next(message);
        break;
      default:
        console.log('Uh oh');
    }
  },
  console.log,
  function () {
    message_type_a.complete();
    message_type_b.complete();
    message_type_c.complete();
    message_type_d.complete();
  }
);

Again, this is clunky and whenever I'm using subjects I ask myself if this is the "Rx" way of doing things.

Ideally I would be able to do something like this:

const [
  message_type_a,
  message_type_b,
  message_type_c,
  message_type_d
] = Rx.fromEvent(socket, 'data').pipe(partitionMany(message.type));

Are there any elegant solutions out there or is my overall approach of splitting the source observable like this fundamentally flawed?

This is my first question so I hope I did a good job. Thanks in advance!

like image 214
mikermcnally Avatar asked Jun 19 '18 14:06

mikermcnally


2 Answers

I changed your switch case solution to more performant one.

const message_type_a = new Rx.Subject();
const message_type_b = new Rx.Subject();
const message_type_c = new Rx.Subject();
const message_type_d = new Rx.Subject();

subjects = {
    'a': message_type_a,
    'b': message_type_b,
    'c': message_type_c,
    'd': message_type_d
}

Rx.fromEvent(socket, 'data').pipe(tap(message => 
subjects[message.type].next(message))).subscribe();
like image 196
Hikmat G. Avatar answered Oct 05 '22 22:10

Hikmat G.


See https://www.npmjs.com/package/rx-splice.

We had the exact same situation, and in our case it was indeed a performance problem (measured using node --perf). I just created this package after reading your question, because sharing is caring. Let me know if it works for you!

Note that you want this only if executing the filter's selector function becomes a problem! As noted in the splice README:

Using only idiomatic RxJS code, one would use filter instead for the use case of splice. However, if you are writing high performance code and this input$ Observable above (or more likely, Subject) would be subscribed hunderths or thousands of times (X), and thus the selector function of filter(fn) would be called X times. This can - and actually did prove to - be the biggest performance bottleneck in our application, so we wrote splice, which executes it's indexing selector only once for each emitted value.

like image 41
Herman Avatar answered Oct 06 '22 00:10

Herman