Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxSwift propagating a value through a chain of flatMaps

Tags:

swift

rx-swift

So I have some RxSwift code where I want to perform a chain of asynchronous operations, all composed using observables. flatMap is the way to do this, and it works great, however it doesn't seem to be able to pass variables down the chain that I can figure out. This is best illustrated by some pseudocode

Assume have 3 functions

class Connection {
    static func establish(address:String) -> Observable<Connection>
    func sendData(data:String) -> Observable<Int> // num bytes written or something
    func close() -> Observable<Void>
}

And I want to call them in a chain such that we connect, send, then close. Something like this

Connection.establish(host)
    .flatMap{ connection in connection.sendData("foo") }
    .flatMap{ numBytes in ????.close() }
    .subscribeNext{ /* all done */ }

The problem is that flatMap doesn't pass it's input parameters down the chain, so that the closure passed to subscribeNext doesn't have access to the connection object, and as such it can't call close.

I could do some awful hack like the following, but I'd really rather not!

var connection:Connection?
Connection.establish(host)
    .flatMap{ c in 
        connection = c
        return c.sendData("foo") 
    }
    .flatMap{ numBytes in connection!.close() }
    .subscribeNext{ /* all done */ }

In the C# version of Rx, this is solved by using an overload to SelectMany which takes a second closure, which combines the 2 values (usually into a Tuple) and then that thing is propagated down the chain. I've written this as an extension for RxSwfit, and it works as follows:

Connection.establish(host)
    .flatMap(
        { connection in connection.sendData("foo") },
        combine: { ($0, $1) }) // tupleify
    .flatMap{ (connection, numbytes) in connection.close() }
    .subscribeNext{ /* all done */ }

This is all well and good, but my primary question is - Is there a better way to do this which is built into RxSwift as it currently stands?

Additionally, writing this extension method is not simple nor easy. I basically re-implemented FlatMap from scratch by copy/pasting the one in MiniRxSwift and modifying it. If we have to write this extension, is there a better way to implement it using RxSwift constructs?

like image 246
Orion Edwards Avatar asked Apr 28 '16 04:04

Orion Edwards


2 Answers

There are two ways to do what you want "using RxSwift constructs."

Connection.establish(host)
    .flatMap { Observable.combineLatest(Observable.just($0), $0.sendData("foo")) }
    .flatMap { connection, _ in connection.close() }
    .subscribe(onNext: { /* all done */ })

or if you don't mind inserting into a map you could:

Connection.establish(host)
    .flatMap { connection in
        connection.sendData("foo").map { (connection, $0) }
    }
    .flatMap { connection, _ in connection.close() }
    .subscribe(onNext: { /* all done */ })

Note that combineLatest and map were both in the library from the beginning.

like image 179
Daniel T. Avatar answered Nov 04 '22 07:11

Daniel T.


You can do that in your establish function. It probably looks like this:

static func establish(address:String) -> Observable<Connection> {
    return Observable.create { observer in
        //create connection
        observer.onNext(connection)
        observer.onCompleted()

        return AnonymousDisposable { connection.close() }
    }
}

When your observer being disposed, it will also close your connection.
And I think this is one of the best option here.

However, we can passed down the connection though the chain by using combineLatest or flatMap or other functions. But it will be tedious :)


Reply to Orion Edwards

'm not sure this would work; In my example the disposable is never actually disposed, we want to close the connection as soon as the operation completes instead

Well, I'm pretty sure this is how RxSwift's people would do.
If you got your results, either it completed or error out, you dispose the observer. And if you want to do it again, then you re-subscribe it.

You can use one of these functions in order to dispose it:

  • Using take family: take(1), takeUntil...
  • Call dispose() on observers when you finish. (discourage)
  • Use debug() to see when an observer is disposed
  • ...
like image 26
Pham Hoan Avatar answered Nov 04 '22 08:11

Pham Hoan