Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxSwift, how do I chain different observables

I am still a beginner in Reactive programming, and RxSwift in general. I want to chain two different operation. In my case I simply want to download a zip file from a web server, and then unzip it locally. I also want, at the same time to show the progress of the downloaded files. So I started creating the first observable:

 class func rx_download(req:URLRequestConvertible, testId:String) -> Observable<Float> {

    let destination:Request.DownloadFileDestination = ...

    let obs:Observable<Float> = Observable.create { observer in
       let request =  Alamofire.download(req, destination: destination)
        request.progress { _, totalBytesWritten, totalBytesExpectedToWrite in
            if totalBytesExpectedToWrite > 0 {
                observer.onNext(Float(totalBytesWritten) / Float(totalBytesExpectedToWrite))
            }
            else {
                observer.onNext(0)
            }
        }
        request.response {  _, response, _, error in
            if let responseURL = response {
                if responseURL.statusCode == 200 {
                    observer.onNext(1.0)
                    observer.onCompleted()
                } else  {
                    let error = NSError(domain: "error", code: responseURL.statusCode, userInfo: nil)
                    observer.onError(error)
                }
            } else {
                let error = NSError(domain: "error", code: 500, userInfo: nil)
                observer.onError(error)
            }

            }
            return AnonymousDisposable () {
                request.cancel()
            }
        }
    return obs.retry(3)

}

After that, I create a similar function for the unzip

class func rx_unzip(testId:String) -> Observable<Float> {
    return Observable.create { observer in
        do {
            try Zip.unzipFile(NSURL.archivePath(testId), destination: NSURL.resourceDirectory(testId), overwrite: true, password: nil)
                {progress in
                    observer.onNext(Float(progress))
                }
        } catch let error {
            observer.onError(error)
        }
        observer.onCompleted()
        return NopDisposable.instance
    }
}

For now I have this logic on the "View model layer", so I download-> subscribe on completed-> unzip

What I want is to combine the two Observable in one, in order to perform the download first, then on completed unzip the file. Is there any way to do this?

like image 281
Punty Avatar asked Mar 11 '16 09:03

Punty


2 Answers

You can use concat operator to chain these two Observables. The resulting Observable will send next values from the first one, and when it completes, from the second one.

There is a caveat: you will get progress values ranging from 0.0 to 1.0 from rx_download and then again the progress from rx_unzip will start with 0.0. This might be confusing to the user if you want to show the progress on a single progress view.

A possible approach would be to show a label describing what is happening along with the progress view. You can map each Observable to a tuple containing the progress value and the description text and then use concat. It can look like that:

let mappedDownload = rx_download.map {
    return ("Downloading", $0)
}

let mappedUnzip = rx_download.map {
    return ("Unzipping", $0)
}

mapped1.concat(mapped2)
 .subscribeNext({ (description, progress) in
    //set progress and show description
})

Of course, there are many possible solutions, but this is more of a design problem than a coding one.

like image 108
Michał Ciuba Avatar answered Sep 17 '22 17:09

Michał Ciuba


Concat operator requires the same data type

Indeed, the concat operator allows you to enforce the sequence of observables, however an issue you might encounter with using concat is that the concat operator requires that the Observables have the same generic type.

let numbers     : Observable<Int>    = Observable.from([1,2,3])
let moreNumbers : Observable<Int>    = Observable.from([4,5,6])
let names       : Observable<String> = Observable.from(["Jose Rizal", "Leonor Rivera"])


// This works
numbers.concat(moreNumbers)

// Compile error
numbers.concat(names)

FlatMap operator allows you to chain a sequence of Observables

Here's an example.

class Tag {
    var tag: String = ""
    init (tag: String) {
        self.tag = tag
    }
}

let getRequestReadHTML : Observable<String> = Observable
                            .just("<HTML><BODY>Hello world</BODY></HTML>")

func getTagsFromHtml(htmlBody: String) -> Observable<Tag> {
    return Observable.create { obx in

        // do parsing on htmlBody as necessary

        obx.onNext(Tag(tag: "<HTML>"))
        obx.onNext(Tag(tag: "<BODY>"))
        obx.onNext(Tag(tag: "</BODY>"))
        obx.onNext(Tag(tag: "</HTML>"))

        obx.onCompleted()

        return Disposables.create()
    }
}

getRequestReadHTML
    .flatMap{ getTagsFromHtml(htmlBody: $0) }
    .subscribe (onNext: { e in
        print(e.tag)
    })

Notice how getRequestReadHTML is of type Observable<String> while the function getTagsFromHtml is of type Observable<Tag>.

Using multiple flatMaps can increase emission frequency

Be wary however, because the flatMap operator takes in an array (e.g. [1,2,3]) or a sequence (e.g. an Observable) and will emit all of the elements as an emission. This is why it is known to produce a transformation of 1...n.

If you defined an observable such as a network call and you are sure that there will only be one emission, you will not encounter any problems since its transformation is a 1...1 (i.e. one Observable to one NSData). Great!

However, if your Observable has multiple emissions, be very careful because chained flatMap operators will mean emissions will exponentially(?) increase.

A concrete example would be when the first observable emits 3 emissions, the flatMap operator transforms 1...n where n = 2, which means there are now a total of 6 emissions. Another flatMap operator could again transform 1...n where n = 2, which means there are now a total of 12 emissions. Double check if this is your expected behavior.

like image 37
dsapalo Avatar answered Sep 17 '22 17:09

dsapalo