As I port some Objective-C code to Swift, I'm trying to better understand the new Combine
framework and how I can use it to re-create a common design pattern.
In this case, the design pattern is a single object (Manager, Service, etc) that any number of "clients" can register with as a delegate to receive callbacks. It's a basic 1:Many pattern using delegates.
Combine
looks ideal for this, but the sample code is a bit thin. Below is a working example but I'm not sure if it's correct or being used as intended. In particular, I'm curious about reference cycles between the objects.
class Service {
let tweets = PassthroughSubject<String, Never>()
func start() {
// Simulate the need send to send updates.
DispatchQueue.global(qos: .utility).async {
while true {
self.sendTweet()
usleep(100000)
}
}
}
func sendTweet() {
tweets.send("Message \(Date().timeIntervalSince1970)")
}
}
class Client : Subscriber {
typealias Input = String
typealias Failure = Never
let service:Service
var subscription:Subscription?
init(service:Service) {
self.service = service
// Is this a retain cycle?
// Is this thread-safe?
self.service.tweets.subscribe(self)
}
func receive(subscription: Subscription) {
print("Received subscription: \(subscription)")
self.subscription = subscription
self.subscription?.request(.unlimited)
}
func receive(_ input: String) -> Subscribers.Demand {
print("Received tweet: \(input)")
return .unlimited
}
func receive(completion: Subscribers.Completion<Never>) {
print("Received completion")
}
}
// Dependency injection is used a lot throughout the
// application in a similar fashion to this:
let service = Service()
let client = Client(service:service)
// In the real world, the service is started when
// the application is launched and clients come-and-go.
service.start()
Output is:
Received subscription: PassthroughSubject
Received tweet: Message 1560371698.300811
Received tweet: Message 1560371698.4087949
Received tweet: Message 1560371698.578027
...
Is this even remotely close to how Combine
was intended to be used?
The Combine framework provides a declarative Swift API for processing values over time. These values can represent many kinds of asynchronous events. Combine declares publishers to expose values that can change over time, and subscribers to receive those values from the publishers.
AnyPublisher is a concrete implementation of Publisher that has no significant properties of its own, and passes through elements and completion values from its upstream publisher. Use AnyPublisher to wrap a publisher whose type has details you don't want to expose across API boundaries, such as different modules.
A PassthroughSubject broadcasts elements to downstream subscribers and provides a convenient way to adapt existing imperative code to Combine. As the name suggests, this type of subject only passes through values meaning that it does not capture any state and will drop values if there aren't any subscribers set.
There may be some cases though, where processing those received values takes longer while new values arrive. In this case, we may need to control the amount of values that arrive to avoid some kind of blocking or overflow. This concept of limiting the elements the subscriber receives is called backpressure.
lets check it! the simplest way is add deinit to both classes and limit the live of service
class Service {
let tweets = PassthroughSubject<String, Never>()
func start() {
// Simulate the need send to send updates.
DispatchQueue.global(qos: .utility).async {
(0 ... 3).forEach { _ in
self.sendTweet()
usleep(100000)
}
}
}
func sendTweet() {
tweets.send("Message \(Date().timeIntervalSince1970)")
}
deinit {
print("server deinit")
}
}
now it is easy to check that
do {
let service = Service()
//_ = Client(service:service)
// In the real world, the service is started when
// the application is launched and clients come-and-go.
service.start()
}
will finished as expected
server deinit
modify it with subscribing client
do {
let service = Service()
_ = Client(service:service)
service.start()
}
and you immediately know the result
Received subscription: PassthroughSubject
Received tweet: Message 1580816649.7355099
Received tweet: Message 1580816649.8548698
Received tweet: Message 1580816650.001649
Received tweet: Message 1580816650.102639
there is a memory cycle, as you expected :-)
Generally, there is very low probability, that you need your own subscriber implementation.
First modify the service, so the client will know when no more messages will arrive
func start() {
// Simulate the need send to send updates.
DispatchQueue.global(qos: .utility).async {
// send some tweets
(0 ... 3).forEach { _ in
self.sendTweet()
usleep(100000)
}
// and send "finished"
self.tweets.send(completion: .finished)
}
}
and next use "build-in" subcriber in your publisher by invoking his .sink method. .sink return AnyCancelable (it is a reference type) which you have to store somewhere.
var cancelable: AnyCancellable?
do {
let service = Service()
service.start()
// client
cancelable = service.tweets.sink { (s) in
print(s)
}
}
now, everythig works, es expected ...
Message 1580818277.2908669
Message 1580818277.4674711
Message 1580818277.641886
server deinit
But what about cancelable? Let check it!
var cancelable: AnyCancellable?
do {
let service = Service()
service.start()
// client
cancelable = service.tweets.sink { (s) in
print(s)
}
}
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
print(cancelable)
}
it prints
Message 1580819227.5750608
Message 1580819227.763901
Message 1580819227.9366078
Message 1580819228.072041
server deinit
Optional(Combine.AnyCancellable)
so you have to release it "manualy", if you don't need it anymore. .sink is there again!
var cancelable: AnyCancellable?
do {
let service = Service()
service.start()
// client
cancelable = service.tweets.sink(receiveCompletion: { (completion) in
print(completion)
// this inform publisher to "unsubscribe" (not necessery in this scenario)
cancelable?.cancel()
// and we can release our canceleble
cancelable = nil
}, receiveValue: { (message) in
print(message)
})
}
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
print(cancelable)
}
and result
Message 1580819683.462331
Message 1580819683.638145
Message 1580819683.74383
finished
server deinit
nil
Combine has almost everything you need in real word application, the trouble is a lack of documentation, but a lot of sources are available on the internet.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With