Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I leverage reactive extensions to do caching, without a subject?

I want to be able to fetch data from an external Api for a specific request, but when that data is returned, also make it available in the cache, to represent the current state of the application.

This solution seems to work:

var Rx = require('rx');

var cached_todos = new Rx.ReplaySubject(1);

var api = {
  refresh_and_get_todos: function() {
    var fetch_todos = Rx.Observable.fromCallback($.get('example.com/todos'));
    return fetch_todos()
      .tap(todos => cached_todos.onNext(todos));
  },
  current_todos: function() {
    return cached_todos;
  }
};

But - apparently Subjects are bad practice in Rx, since they don't really follow functional reactive programming.

What is the right way to do this in a functional reactive programming way?

like image 758
Oved D Avatar asked Feb 09 '23 13:02

Oved D


1 Answers

It is recommended not to use Subjects because there is a tendency to abuse them to inject side-effects as you have done. They are perfectly valid to use as ways of pushing values into a stream, however their scope should be tightly constrained to avoid bleeding state into other areas of code.

Here is the first refactoring, notice that you can create the source beforehand and then your api code is just wrapping it up in a neat little bow:

var api = (function() {
    var fetch_todos = Rx.Observable.fromCallback($.get('example.com/todos'))
        source = new Rx.Subject(),
        cached_todos = source
          .flatMapLatest(function() { 
              return fetch_todos(); 
          })
          .replay(null, 1)
          .refCount();

    return {
      refresh: function() {
        source.onNext(null);
      },
      current_todos: function() {
        return cached_todos;
      }
    };
})();

The above is alright, it maintains your current interface and side-effects and state have been contained, but we can do better than that. We can create either an extension method or a static method that accepts an Observable. We can then simplify even further to something along the lines of:

//Executes the function and caches the last result every time source emits
Rx.Observable.withCache = function(fn, count) {
    return this.flatMapLatest(function() {
      return fn();
    })
    .replay(null, count || 1)
    .refCount();
};

//Later we would use it like so:

var todos = Rx.Observable.fromEvent(/*Button click or whatever*/))
             .withCache(
                 Rx.Observable.fromCallback($.get('example.com/todos')),
                 1 /*Cache size*/);


todos.subscribe(/*Update state*/);
like image 135
paulpdaniels Avatar answered Apr 30 '23 06:04

paulpdaniels