Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

rxjs observe array push

I want to monitor when an object is pushed onto an array using observables. I want to start with an empty array and when a push occurs I want the observable to detect and process it and then wait till the next push. This would be very similar to "fromEvent" where the observable waits for the event. The code below immediately calls completed() because the array is empty, how do I make it wait for a push?

    var testArray = [];

    test(){
      var o = {timestamp: new Date()}
      testArray.push(o)
    }

    var o = Observable
        .from(testArray)
        .concatMap( x => {
          return x;
    });

    o.subscribe( 
      x => { console.log("onNext x=",x.timestamp) },
      e => console.log('onError:', e),
      () => {console.log('onCompleted');} );

Note: The input mechanism does not have to be an array. Any type of message queue object will work for me.

like image 864
Marty B Avatar asked Feb 10 '17 22:02

Marty B


2 Answers

If all you're trying to do is create an Observable that you can 'push' values into, I recommend using an RXJS Subject.

i.e.

const date$ = new Rx.Subject();
date$.next(new Date());

Now you have an Observable stream of Date objects that you can "push" to with the next() method.

If you really need to have an intermediate (non-Observable) data type for your queue, then I recommend using a new ES6 feature, proxies.

const queue = new Proxy([], {
  set: function(obj, prop, value) {
    if (!isNaN(prop)) {
      date$.next(value)
    }
    obj[prop] = value
    return true
  },
})

Now you have an array that is proxied so that any time a value is added to it, it will be added to your Observable stream.

like image 174
Brandon Ramirez Avatar answered Oct 01 '22 03:10

Brandon Ramirez


You could subclass Array and implement some kind of notification mechanism to tell you when pushes happen (this is really bare bones):

class CustomArray extends Array {
  push(e) {
    super.push(e)
    if (this._listeners) {
      this._listeners.forEach(l => l(e))
    }
  }
  addPushListener(listener) {
    this._listeners = this._listeners || []
    this._listeners.push(listener)
  }
  removePushListener(listener) {
    if (this._listeners) {
      const index = this._listeners.indexOf(listener)
      if (index >= 0) {
        this._listeners.splice(index, 1)
      }
    }
  }
}

Then with a function you could wrap this into an Observable

const observePushes = array => Rx.Observable.fromEventPattern(
  array.addPushListener.bind(array),
  array.removePushListener.bind(array)
)

Then you would be able to subscribe to changes and unsubscribe whenever you want, like with any other observable.

const arr = new CustomArray()
const pushObservable = observePushes(arr)

const subscription = pushObservable.subscribe(e => console.log(`Added ${e}`))

arr.push(1)
arr.push(2)
arr.push(3)
arr.push("a")

subscription.dispose()

arr.push("b")

Also mind that this Observable never really completes, since at no point in time can you guarantee that nothing more will be added to an array.

A fiddle: http://jsfiddle.net/u08daxdv/1/

like image 35
Balázs Édes Avatar answered Oct 01 '22 03:10

Balázs Édes