So I have some RxSwift code where I want to perform a chain of asynchronous operations, all composed using observables. flatMap
is the way to do this, and it works great, however it doesn't seem to be able to pass variables down the chain that I can figure out. This is best illustrated by some pseudocode
Assume have 3 functions
class Connection {
static func establish(address:String) -> Observable<Connection>
func sendData(data:String) -> Observable<Int> // num bytes written or something
func close() -> Observable<Void>
}
And I want to call them in a chain such that we connect, send, then close. Something like this
Connection.establish(host)
.flatMap{ connection in connection.sendData("foo") }
.flatMap{ numBytes in ????.close() }
.subscribeNext{ /* all done */ }
The problem is that flatMap
doesn't pass it's input parameters down the chain, so that the closure passed to subscribeNext
doesn't have access to the connection
object, and as such it can't call close.
I could do some awful hack like the following, but I'd really rather not!
var connection:Connection?
Connection.establish(host)
.flatMap{ c in
connection = c
return c.sendData("foo")
}
.flatMap{ numBytes in connection!.close() }
.subscribeNext{ /* all done */ }
In the C# version of Rx, this is solved by using an overload to SelectMany
which takes a second closure, which combines the 2 values (usually into a Tuple) and then that thing is propagated down the chain. I've written this as an extension for RxSwfit, and it works as follows:
Connection.establish(host)
.flatMap(
{ connection in connection.sendData("foo") },
combine: { ($0, $1) }) // tupleify
.flatMap{ (connection, numbytes) in connection.close() }
.subscribeNext{ /* all done */ }
This is all well and good, but my primary question is - Is there a better way to do this which is built into RxSwift as it currently stands?
Additionally, writing this extension method is not simple nor easy. I basically re-implemented FlatMap from scratch by copy/pasting the one in MiniRxSwift and modifying it. If we have to write this extension, is there a better way to implement it using RxSwift constructs?
There are two ways to do what you want "using RxSwift constructs."
Connection.establish(host)
.flatMap { Observable.combineLatest(Observable.just($0), $0.sendData("foo")) }
.flatMap { connection, _ in connection.close() }
.subscribe(onNext: { /* all done */ })
or if you don't mind inserting into a map you could:
Connection.establish(host)
.flatMap { connection in
connection.sendData("foo").map { (connection, $0) }
}
.flatMap { connection, _ in connection.close() }
.subscribe(onNext: { /* all done */ })
Note that combineLatest
and map
were both in the library from the beginning.
You can do that in your establish
function. It probably looks like this:
static func establish(address:String) -> Observable<Connection> {
return Observable.create { observer in
//create connection
observer.onNext(connection)
observer.onCompleted()
return AnonymousDisposable { connection.close() }
}
}
When your observer
being disposed, it will also close your connection.
And I think this is one of the best option here.
However, we can passed down the connection though the chain by using combineLatest
or flatMap
or other functions. But it will be tedious :)
Reply to Orion Edwards
'm not sure this would work; In my example the disposable is never actually disposed, we want to close the connection as soon as the operation completes instead
Well, I'm pretty sure this is how RxSwift's people would do.
If you got your results, either it completed
or error
out, you dispose the observer. And if you want to do it again, then you re-subscribe
it.
You can use one of these functions in order to dispose it:
take
family: take(1)
, takeUntil
...dispose()
on observers when you finish. (discourage) debug()
to see when an observer is disposed If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With