My code contains this simple function I'm using to upload files to my PHP server (there's an xhr request nested in an RxJS/Observable):
fileUpload(file: File): Observable<any> {
    return new Observable( observer => {
        let xhr:XMLHttpRequest = new XMLHttpRequest();
        xhr.onreadystatechange = () => {
            if (xhr.readyState === 4) {
                if (xhr.status === 200) {
                    observer.next(<any>JSON.parse(xhr.response));
                } else {
                    observer.error(xhr.response);
                    observer.complete();
                }
            }
        };
        xhr.open('POST', '__BACKEND__?action=file-upload', true);
        var formData = new FormData();
        formData.append('file', file, file.name);
        xhr.send(formData);
    });
}
It is completely functional but now I would also like to add some sort of a cancellation mechanic to it.
Just unsubscribing from the Observable won't work, because I need to somehow call xhr.abort() or I waste precious resources with large uploads.
Is it possible to get an elegant solution by modifying this code or am I doing it wrong because I'm using an RxJS/Observable for this task?
When you create an Observable you can specify the unsubscribe behavior by returning a Subscription or a function from builder function:
fileUpload(file: File): Observable<any> {
    return new Observable( observer => {
        let xhr:XMLHttpRequest = new XMLHttpRequest();
        xhr.onreadystatechange = () => {
            if (xhr.readyState === 4) {
                if (xhr.status === 200) {
                    observer.next(<any>JSON.parse(xhr.response));
                    observer.complete();
                } else {
                    observer.error(xhr.response);
                }
            }
        };
        xhr.open('POST', '__BACKEND__?action=file-upload', true);
        var formData = new FormData();
        formData.append('file', file, file.name);
        xhr.send(formData);
        //Return the tear down logic. 
        //You may also want to check here that it has not already completed
        //Since this gets called in all cases when the `Subscription` terminates
        return () => xhr.abort();
    });
}
Return the xhr object and execute abort on it in a another observable.
var uploadObservable = fileUpload();
var uploadRequest;
uploadObservable.subscribe(
  function (x) {
    uploadRequest = x;
  },
  function (err) {
    console.log('Error: %s', err);
  },
  function () {
    console.log('Completed');
  });
var cancelBtn = Rx.Observable.fromEvent(cancelBtn, 'click');
cancelBtn.subscribe(
  function (x) {
    uploadRequest.abort();
  },
  function (err) {
    console.log('Error: %s', err);
  },
  function () {
    console.log('Completed');
  });
Or
 fileUpload()
    .flatMap(function(xhr) {
      Rx.Observable.fromEvent(cancelBtn, 'click').subscribe(function() {xhr.abort()})
    })
.subscribe(...);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With