Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I get the single latest value from an infinite RxJs stream that is not the initial value?

The concept

This is a mocked angular2 project.

When consuming the observable stream from the redux store I tried to filter first and then take/takeLast/last the latest value. After that I want to resolve the promise when the stream completes but it does not when using takeLast operator.

So the question is: What operator setup can I use to get the latest value from the stream?

The setup

I simplified my Angular 2 setup to this gist of RxJs usage.

  • source observable is managed by redux library and is not completed
  • service is providing some logic to retrieve the latest value from the stream
  • component is consuming value promise style

Here is a working example: https://fiddle.jshell.net/markus_falk/an41z6g9/

The redux store mock:

var latestTime$ = new Rx.Subject();
setInterval(function(){
     latestTime$.onNext(Date.now()); 
}, 2000);

The service injectable mock:

var timeStore = null;
var getLatestTime = function() {

  return new Promise((resolve, reject) => {

     latestTime$

     /* 
        filter out 'null' for when the button is clicked
        before the store updates the first time
      */
     .filter(function(x) {
        console.log('filter: ', x);
        return x === typeof('number');
     })

     // try to end to stream by taking the last from the stream ?!?!?!?
     .takeLast(1)

     // handle promise
     .subscribe(

       function (x) {
         console.log('Next: ' + x);
         // store latest stream value
         timeStore = x;
       },
       function (err) {
         console.log('Error: ' + err);
         reject(err)
       },
       function () {
         console.log('Completed');
         // pass on latest value of endless when stream completes 
         resolve(timeStore);
       }

    );

  });

};

And a consuming mock component:

document.querySelector("#foo").addEventListener("click", function(event) {

  var time = getLatestTime();

  time.then((latestTime) => {
    console.log('latestTime: ', latestTime);
  });

  time.catch((err) => {
    console.log('oh oh: ', err);
  });

}, false);
like image 229
Markus Avatar asked Sep 29 '16 07:09

Markus


1 Answers

This should simulate your situation.

See live demo: https://jsfiddle.net/usualcarrot/zh07hfrc/1/

var subject = new Rx.Subject();

subject.skip(1).last().subscribe(function(val) {
  console.log('next:', val);
}, function(val) {
  console.log('error:', val);
}, function() {
  console.log('completed');
});

subject.onNext(1);
subject.onNext(2);
subject.onNext(3);
subject.onNext(4);
subject.onNext(5);
subject.onCompleted();

This prints to console:

next: 5
completed

Instead of console.log('completed'); you'd put the resolve(...). Maybe this is not even necessary and you can use just return the Subject and subscribe to it as well (?) depending on your use case. In that case use asObservable() to hide the fact you're using a Subject. See similar use-case with asObservable().

like image 164
martin Avatar answered Oct 11 '22 23:10

martin