Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to handle progress update using ReactiveX Observables/Subjects?

I'm writing an Angular app which uses the ReactiveX API to handle asynchronous operations. I used the API before in an Android project and I really like how it simplifies concurrent task handling. But there is one thing which I'm not sure how to solve in a right way.

How to update observer from an ongoing task? The task in this case will take time to load/create a complex/large object and I'm able to return intermediate progress, but not the object itself. The observable can only return one dataType. Therefor I know two possibilities.

  1. Create an object which has a progress field and a data field. This object can be simply returned with Observable.onNext(object). The progress field will update on every onNext, while the data field is empty until the last onNext, which will set it to the loaded value.

  2. Create two observables, a data observable and a progress observable. The observer hast to subscribe to the progress observable for progress updates and to the data observable to be notified when the data is finally loaded/created. These can also be optionally be zipped together for one subscription.

I used both techniques, they both work, but I want to know if there is a unified standard, a clean way, how to solve this task. It can, of course, as well be a completly new one. Im open for every solution.

like image 688
TardigradeX Avatar asked Sep 11 '17 08:09

TardigradeX


1 Answers

After careful consideration I use a solution similar to option two in my question. The main observable is concerned with the actual result of the operation. A http request in this case, but the File iteration example is similar. It is returned by the "work" function.

A second Observer/Subscriber can be added through a function parameter. This subscriber is concerned only with the progress information. This way all operations are nullsafe and no type checks are needed.

A second version of the work function, without the progress Observer, can be used if no progress UI update is needed.

export class FileUploadService {

 doWork(formData: FormData, url: string): Subject<Response> {
    return this.privateDoWork(formData, url, null);
 }

 doWorkWithProgress(formData: FormData, url: string, progressObserver: Observer<number>): Subject<Response> {
    return this.privateDoWork(formData, url, progressObserver);
 }

 private privateDoWork(formData: FormData, url: string, progressObserver: Observer<number> | null): Subject<Response> {

     return Observable.create(resultObserver => {
     let xhr: XMLHttpRequest = new XMLHttpRequest();
     xhr.open("POST", url);

     xhr.onload = (evt) => {
         if (progressObserver) {
            progressObserver.next(1);
            progressObserver.complete();
            }
         resultObserver.next((<any>evt.target).response);
         resultObserver.complete()
     };
     xhr.upload.onprogress = (evt) => {
         if (progressObserver) {
            progressObserver.next(evt.loaded / evt.total);
         }

     };
     xhr.onabort = (evt) => resultObserver.error("Upload aborted by user");
     xhr.onerror = (evt) => resultObserver.error("Error");

     xhr.send(formData);
     });
 }

Here is a call of the function including the progress Subscriber. With this solution the caller of the upload function must create/handle/teardown the progress subscriber.

 this.fileUploadService.doWorkWithProgress(this.chosenSerie.formData, url, new Subscriber((progress) => console.log(progress * 100)).subscribe(
    (result) => console.log(result),
    (error) => console.log(error),
    () => console.log("request Completed")
    );

Overall I prefered this solution to a "Pair" Object with a single subscription. There is no null handling nececcary, and I got a clean seperation of concerns.

The example is written in Typescript, but similar solutions should be possible with other ReactiveX implementations.

like image 120
TardigradeX Avatar answered Sep 26 '22 09:09

TardigradeX