Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Create a Observable that delays the next value

I'm trying to create an observable using RxJS that does what is pictured.

Expected observable mapping

  • Grabs a value and waits a fixed period of time before getting the next one.
  • The next one will be the last value emitted in the period of the wait, skipping the rest.
  • If an wait interval goes by where no value was emitted, the next one should be grabbed immediately as the last example of the image depicts.
like image 994
queimadus Avatar asked May 22 '14 22:05

queimadus


2 Answers

This should do the trick.

var Rx      = require('rx'),
    source  = Rx.Observable.interval(10).take(100),
    log     = console.log.bind(console);

Rx.Observable.create(function (observer) {

    var delaying = false,
        hasValue = false,
        complete = false,
        value;

    function onNext (x) {
      value = x;
      if (delaying) {
        hasValue = true;
      } else {
        sendValue();
      }
    }

    function sendValue () {
      observer.onNext(value);
      if (complete) {
        observer.onCompleted();
      } else {
        setTimeout(callback, 1000); // exercise for the reader. Use a scheduler.
      }
      delaying = true;
    }

    function callback () {
      if (hasValue) {
        hasValue = false;
        sendValue();
      } else {
        delaying = false;
      }
    }

    return source.subscribe(
        onNext,
        observer.onError.bind(observer),
        function () {
          if (hasValue) {
            complete = true;
          } else {
            observer.onCompleted();
          }
        }
      );
  })
  .subscribe(log);
like image 143
cwharris Avatar answered Sep 29 '22 08:09

cwharris


Here is Christopher's solution modified into a operator.

The throttleImmediate operator only stores the latest value from the source until the given selector completes. It fires the cached value, if existent, right after each completion. It is best suited to use when the selector has side effects (e.g. an animation).

var Rx  = require('rx'),
source  = Rx.Observable.interval(10).take(500),
log     = console.log.bind(console);

Rx.Observable.prototype.throttleImmediate = function (selector) {
    var source = this;

    return Rx.Observable.create(function (observer) {

        var delaying = false,
            hasValue = false,
            complete = false,
            value;

        function onNext (x) {
          value = x;
          if (delaying) {
            hasValue = true;
          } else {
            sendValue();
          }
        }

        function sendValue () {
          delaying = true;
          selector(value).subscribe(
            observer.onNext.bind(observer),
            observer.onError.bind(observer),
            function(){
              if (hasValue) {
                hasValue = false;
                sendValue();
              } else {
                delaying = false;
              }
            }
          );
        }

        return source.subscribe(
            onNext,
            observer.onError.bind(observer),
            function () {
              if (hasValue) {
                complete = true;
              } else {
                observer.onCompleted();
              }
            }
          );
      });
};

source
  .throttleImmediate(function(data){
    var delay;

    if(data%2==0)
      delay=500;
    else
      delay=1000;

    return Rx.Observable.timer(delay).map(function(){ return data; });
  })
  .subscribe(log)

This comes in handy while back pressuring sources where the value to delay is only known by the selector.

Example: Given the question's marble diagram.

Let's suppose the first source are ajax calls with html data to display, ajaxPages that originated from clicks on a navbar. And we want render them along with an entry animation, animatePage, whose duration is dynamic.

ajaxPages.throttleImmediate(animatePage).subscribe();

Here we animate the pages with the values from the source, skipping all the values that are emitted during the period of animation except the latest.

In practice, what we get is an stream that ignores clicks that are shortly followed by other clicks and are useless to show to the user since they would animate in, and immediately animate out.

like image 34
queimadus Avatar answered Sep 29 '22 08:09

queimadus