I have to streams which I merge to get an URL to load an image: one stream for drop events and one for an file input change. On every new path I load this image and draw it to a canvas. This canvas is passed into another stream. It looks like this:
// prevent browsers default behavior for dropTargetElement
[ 'drop', 'dragover' ].forEach(function(eventName) {
Rx.Observable.fromEvent(dropTargetElement, eventName).subscribe(function(event) {
event.preventDefault();
});
});
// file path stream merged from openFileInputs change and openFileDropTargets drop
Rx.Observable.merge(Rx.Observable.fromEvent(inputElement, 'change').map(function(event) {
return event.target.value;
}), Rx.Observable.fromEvent(dropTargetElement, 'drop').map(function(event) {
return event.dataTransfer.files[0].path;
})).map(function(path) {
var image = new Image();
image.src = path;
// note: I return an Observable in this map function
// is this even good practice? if yes, is mergeAll the best
// way to get the "load" event?
return Rx.Observable.fromEvent(image, 'load');
}).mergeAll().map(function(event) {
return event.path[0];
}).subscribe(function(image) {
var canvas = document.createElement('canvas');
var context = canvas.getContext('2d');
canvas.width = image.width;
canvas.height = image.height;
context.drawImage(image, 0, 0);
canvasState.onNext(canvas);
});
(Side question: Is it "allowed" to return Observables in map
?)
My canvasState
looks like this:
var canvasState = new Rx.BehaviorSubject(undefined);
// draw image
canvasState.filter(function(canvas) {
return !!canvas;
}).subscribe(function drawImage(canvas) {
document.body.appendChild(canvas);
});
As you can see I append the canvas to the body, if my canvas value is truthy. But every time a new canvas comes in I would like to remove the old one. What is the best way to achieve that? Is something like this possible?
// draw image
canvasState.filter(function(canvas) {
return !!canvas;
}).beforeNext(function(oldCanvas) {
// remove old one
}).subscribe(function drawImage(canvas) {
document.body.appendChild(canvas);
});
Yes it is normal to have a map
operation return an Observable. This gives you an observable stream of observable streams. There are Rx 3 operations to flatten that nested observable one level:
maxConcurrent
parameter to limit the number of concurrent inner subscriptions. When the limit is hit, new inner observables get queued and do not get subscribed until a previous inner observable completes.Merge
with maxConcurrent set to 1.switch
when you only want to listen to the most recent inner stream.(In my code, I've also implemented a fourth flattening operator: concatLatest
which is like concat
, but when an inner stream completes, instead of jumping to the next stream in the queue, it jumps straight to the most recent sequence in the queue, throwing away any inner streams skipped. It is sort of in between concat
and switch
in behavior and I find it very useful for handling backpressure while still producing results).
So, it is very common to have .map(...).mergeAll()
or .map(...).concatAll()
or .map(...).switch()
. It is so common, that there are Rx methods for these 3 cases:
source.flatMap(selector)
is equivalent to source.map(selector).mergeAll()
source.concatMap(selector)
is equivalent to source.map(selector).concatAll()
source.flatMapLatest(selector)
is equivalent to source.map(selector).switch()
With the above information, I'd suggest you use switch
instead of merge
. As your code stands now, if someone changes the value quickly, you could have 2 image requests out simultaneously and if they complete in the wrong order you will end up with the wrong final canvas. switch
will cancel the stale image request and switch over to the new one.
Note that if the user keeps changing the value faster than the image requests can complete, then they could never see any images since you'll keep switching to the new one. This is where I usually use my concatLatest
instead because it will complete some intermediate requests now and then while it is attemping to keep up. YMMV.
When my observable stream has a map operation that produces resources that must be cleaned up, I tend to use scan
because it can track the previous value for me (without me needing to go make a closure variable to hold the previous value):
canvasState
.filter(function (c) { return !!c; })
.scan({}, function (acc, canvas) {
return { previous: acc.canvas, canvas: canvas };
})
.subscribe(function (v) {
// remove the old canvas
if (v.previous) { document.body.removeChild(v.previous); }
// add the new one
document.body.appendChild(v.canvas);
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With