Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJS: Combining historical data with a stream of updates

Scenario:

I'm loading an initial array of data via a simple ajax call and putting that data through and Observable, which I'll call historical. In parallel, I connect to a websocket and periodically receive data, which we'll call updates, and I want to append this data to historical.

Concretely, let's say the ajax call sends back the array [0,1,2] and the socket emits (over time) 3, 4, 5 then I want to accumulate these values like so:

[0,1,2]        // historical
[0,1,2,3]      // historical + updates1
[0,1,2,3,4]    // historical + updates1 + updates2
[0,1,2,3,4,5]  // etc

(Note that there's a concurrency edge case here that has to be handled: It could happen that historical yields [0,1,2,3] and the first two of the updates are 3 and 4, in which case what I want to end up with is still [0,1,2,3,4]NOT [0,1,2,3,3,4].)

The end goal is to end up with a single Observable stream, which is the combination of the Observables historical and updates as described.

What I've tried so far:

Accumulating just the websocket data is easy enough. I create updates, which is the Observable sequence emitted by the websocket. Each time a value is observed, I can accumulate it into array, using scan():

updates.scan((acc, update) => acc.concat([update]), [])

This would yield something like

[3]
[3,4]
[3,4,5]

My next issue was how to combine it with historical. And since historical's data might arrive after one or more updates were already observed, those updates need to be accumulated while we wait for historical. I managed to achieve this using withLatestFrom():

const stream = historical
  .withLatestFrom(
    updates.scan((acc, update) => acc.concat([update]), []),
    (history, buffer) => history.concat(buffer) /* could eliminate duplicates here */
  )

Observing stream yields a single value, [0,1,2,3,4,5] which is the combination of historical and any updates that arrived before historical. Just what I wanted.

However, I can't figure out where to go from there. How can I continue to append updates to stream so that, over time, stream yields something like:

[0,1,2,3,4,5]
[0,1,2,3,4,5,6]
[0,1,2,3,4,5,6,7]

I don't see a way to use scan for this, the way I did for updates, because in this case I need scan's initial (seed) value to be an Observable, not an Array.

Is there a way to do that — either by adding to what I have so far or an even better alternate way to do the whole thing?

like image 239
meetamit Avatar asked Feb 14 '17 20:02

meetamit


1 Answers

If I understand you correctly I'd use skipUntil() operator to keep collecting the updates without emitting them further. Then for withLatestFrom() operator I'd choose the updates Observable as its source instead. This'll wait thanks to skipUntil() until the historical data is available and then emit on every emission from updates.

let updates = Observable
  .timer(0, 1000)
  .scan((acc, update) => {
    acc.push(update);
    return acc;
  }, []);

let historical = Observable.defer(() => { 
    console.log('Sending AJAX request ...');
    return Observable.of(['h1', 'h2', 'h3']); 
  })
  .delay(3000)
  .share();


const stream = updates.skipUntil(historical)
  .withLatestFrom(historical, (buffer, history) => {
    return history.concat(buffer);
  })
  .map(val => val) // remove duplicates;


stream.subscribe(val => console.log(val));

Output in console is the following:

Sending AJAX request ...
["h1", "h2", "h3", 0, 1, 2, 3]
["h1", "h2", "h3", 0, 1, 2, 3, 4]
["h1", "h2", "h3", 0, 1, 2, 3, 4, 5]

See live demo: https://jsbin.com/kumolez/11/edit?js,console

I don't know what you're usecase is but I'd try to avoid using concat() because it might get slow when the buffer grows.

Also, if you were emitting the updates as they arrive one item at the time (instead of accumulating them) you could use distinct() operator to filter out duplicates.

Btw, I'm assuming you're using RxJS 5.

like image 196
martin Avatar answered Oct 31 '22 21:10

martin