Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

joining two streams of observables in RxJs according to specific conditions

I have two streams of objects, the accounts and balances.

I need to merge (join) the two streams according to the id and account_id

var accounts = Rx.Observable.from([
    { id: 1, name: 'account 1' },
    { id: 2, name: 'account 2' },
    { id: 3, name: 'account 3' },
]);

var balances = Rx.Observable.from([
    { account_id: 1, balance: 100 },
    { account_id: 2, balance: 200 },
    { account_id: 3, balance: 300 },
]);

What is expected:

var results = [
    { id: 1, name: 'account 1', balance: 100},
    { id: 2, name: 'account 2', balance: 200},
    { id: 3, name: 'account 3', balance: 300},
];

Is this feasible with RxJs ?

To be clear I know how to do this with plain js/lodash or something similar. In my case I am getting these streams from Angular Http Module, so I am asking If I could get benefit of RxJs in this case

like image 215
amd Avatar asked Apr 25 '17 06:04

amd


People also ask

How do I combine multiple Observables into one?

We can use the concat operator to take multiple Observables and return a new Observable that sequentially emits values from each Observable that were passed in. It works by subscribing to them one at a time and merging the results in the output Observable.

Which method is used in RXJS to work on multiple operators together in sequential order?

ConcatAll This operator combines all emitted inner streams and just as with plain concat sequentially produces values from each stream.

What operator does RXJS use?

There are mainly two types of RxJS operators: Static Operators: The static operators are generally used to create observables. These types of operators can be found mainly under the creation operators. Instance Operators: The instance operators are methods on observable instances.


1 Answers

Accoring to one of your comment, your example is to simulate a stream from an Angular Http call.

So instead of :

var accounts = Rx.Observable.from([
    { id: 1, name: 'account 1' },
    { id: 2, name: 'account 2' },
    { id: 3, name: 'account 3' },
]);

var balances = Rx.Observable.from([
    { account_id: 1, balance: 100 },
    { account_id: 2, balance: 200 },
    { account_id: 3, balance: 300 },
]);

I'd rather say that it is :

var accounts = Rx.Observable.of([
    { id: 1, name: 'account 1' },
    { id: 2, name: 'account 2' },
    { id: 3, name: 'account 3' },
]);

var balances = Rx.Observable.of([
    { account_id: 1, balance: 100 },
    { account_id: 2, balance: 200 },
    { account_id: 3, balance: 300 },
]);

Why : from will emit every item one by one, of will emit the entire array and I guess your http response is the whole array.

That said, what you probably want to achieve is :

const { Observable } = Rx;

// simulate HTTP requests
const accounts$ = Rx.Observable.of([
  { id: 1, name: 'account 1' },
  { id: 2, name: 'account 2' },
  { id: 3, name: 'account 3' }
]);

const balances$ = Rx.Observable.of([
  { account_id: 1, balance: 100 },
  { account_id: 2, balance: 200 },
  { account_id: 3, balance: 300 }
]);

// utils
const joinArrays = (accounts, balances) =>
  accounts
    .map(account => Object.assign({}, account, { balance: findBalanceByAccountId(balances, account.id).balance }));

const findBalanceByAccountId = (balances, id) =>
  balances.find(balance => balance.account_id === id) || { balance: 0 };

const print = (obj) => JSON.stringify(obj, null, 2)

// use forkJoin to start both observables at the same time and not wait between every request
Observable
  .forkJoin(accounts$, balances$)
  .map(([accounts, balances]) => joinArrays(accounts, balances))
  .do(rslt => console.log(print(rslt)))
  .subscribe();

Output :

[
  {
    "id": 1,
    "name": "account 1",
    "balance": 100
  },
  {
    "id": 2,
    "name": "account 2",
    "balance": 200
  },
  {
    "id": 3,
    "name": "account 3",
    "balance": 300
  }
]

Here's a working Plunkr : https://plnkr.co/edit/bc0YHrISu3FT45ftIFwz?p=preview

EDIT 1 : Working on an array to compose your result is probably not the best idea for performance and instead of returning an array, maybe try to return an object which have as key, the ID of the account. This way you might simply remove the findBalanceByAccountId function and have a faster app (only modified code here) :

const balances$ = Rx.Observable.of({
  1: { account_id: 1, balance: 100 },
  2: { account_id: 2, balance: 200 },
  3: { account_id: 3, balance: 300 }
});

// utils
const joinArrays = (accounts, balances) =>
  accounts
    .map(account => Object.assign(
      {}, 
      account, 
      { balance: balances[account.id].balance }
    ));
like image 171
maxime1992 Avatar answered Sep 23 '22 02:09

maxime1992