Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJS: How to do some clean-up before the next valid value is passed?

I have to streams which I merge to get an URL to load an image: one stream for drop events and one for an file input change. On every new path I load this image and draw it to a canvas. This canvas is passed into another stream. It looks like this:

// prevent browsers default behavior for dropTargetElement
[ 'drop', 'dragover' ].forEach(function(eventName) {
  Rx.Observable.fromEvent(dropTargetElement, eventName).subscribe(function(event) {
    event.preventDefault();
  });
});

// file path stream merged from openFileInputs change and openFileDropTargets drop
Rx.Observable.merge(Rx.Observable.fromEvent(inputElement, 'change').map(function(event) {
  return event.target.value;
}), Rx.Observable.fromEvent(dropTargetElement, 'drop').map(function(event) {
  return event.dataTransfer.files[0].path;
})).map(function(path) {
  var image = new Image();
  image.src = path;

  // note: I return an Observable in this map function
  // is this even good practice? if yes, is mergeAll the best
  // way to get the "load" event?
  return Rx.Observable.fromEvent(image, 'load');
}).mergeAll().map(function(event) {
  return event.path[0];
}).subscribe(function(image) {
  var canvas = document.createElement('canvas');
  var context = canvas.getContext('2d');

  canvas.width = image.width;
  canvas.height = image.height;
  context.drawImage(image, 0, 0);

  canvasState.onNext(canvas);
});

(Side question: Is it "allowed" to return Observables in map?)

My canvasState looks like this:

var canvasState = new Rx.BehaviorSubject(undefined);

// draw image
canvasState.filter(function(canvas) {
  return !!canvas;
}).subscribe(function drawImage(canvas) {
  document.body.appendChild(canvas);
});

As you can see I append the canvas to the body, if my canvas value is truthy. But every time a new canvas comes in I would like to remove the old one. What is the best way to achieve that? Is something like this possible?

// draw image
canvasState.filter(function(canvas) {
  return !!canvas;
}).beforeNext(function(oldCanvas) {
  // remove old one
}).subscribe(function drawImage(canvas) {
  document.body.appendChild(canvas);
});
like image 534
Pipo Avatar asked Feb 01 '15 14:02

Pipo


1 Answers

Nested Observables

Yes it is normal to have a map operation return an Observable. This gives you an observable stream of observable streams. There are Rx 3 operations to flatten that nested observable one level:

  • mergeAll - subscribe to all of the inner streams as they arrive. You usually end up concurrently subscribed to multiple streams and usually your final stream will have results from the different inner streams intermingled. You can supply a maxConcurrent parameter to limit the number of concurrent inner subscriptions. When the limit is hit, new inner observables get queued and do not get subscribed until a previous inner observable completes.
  • concatAll - subscribe to each inner observable stream one at a time, in order. Your resulting stream will produce items in a predictable order. Concat is just Merge with maxConcurrent set to 1.
  • switch - subscribe to the first inner stream. Then, when a new stream arrives, "switch" to it. Basically unsubscribe from the previous inner stream and subscribe to the latest inner stream. Use switch when you only want to listen to the most recent inner stream.

(In my code, I've also implemented a fourth flattening operator: concatLatest which is like concat, but when an inner stream completes, instead of jumping to the next stream in the queue, it jumps straight to the most recent sequence in the queue, throwing away any inner streams skipped. It is sort of in between concat and switch in behavior and I find it very useful for handling backpressure while still producing results).

So, it is very common to have .map(...).mergeAll() or .map(...).concatAll() or .map(...).switch(). It is so common, that there are Rx methods for these 3 cases:

  • source.flatMap(selector) is equivalent to source.map(selector).mergeAll()
  • source.concatMap(selector) is equivalent to source.map(selector).concatAll()
  • source.flatMapLatest(selector) is equivalent to source.map(selector).switch()

Your Code

With the above information, I'd suggest you use switch instead of merge. As your code stands now, if someone changes the value quickly, you could have 2 image requests out simultaneously and if they complete in the wrong order you will end up with the wrong final canvas. switch will cancel the stale image request and switch over to the new one.

Note that if the user keeps changing the value faster than the image requests can complete, then they could never see any images since you'll keep switching to the new one. This is where I usually use my concatLatest instead because it will complete some intermediate requests now and then while it is attemping to keep up. YMMV.

Cleaning up the old canvases

When my observable stream has a map operation that produces resources that must be cleaned up, I tend to use scan because it can track the previous value for me (without me needing to go make a closure variable to hold the previous value):

canvasState
    .filter(function (c) { return !!c; })
    .scan({}, function (acc, canvas) {
        return { previous: acc.canvas, canvas: canvas };
    })
    .subscribe(function (v) {
        // remove the old canvas
        if (v.previous) { document.body.removeChild(v.previous); }

        // add the new one
        document.body.appendChild(v.canvas);
    });
like image 148
Brandon Avatar answered Oct 14 '22 16:10

Brandon