Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Skeleton Example for Swift Combine Publisher-Subscriber

As I port some Objective-C code to Swift, I'm trying to better understand the new Combine framework and how I can use it to re-create a common design pattern.

In this case, the design pattern is a single object (Manager, Service, etc) that any number of "clients" can register with as a delegate to receive callbacks. It's a basic 1:Many pattern using delegates.

Combine looks ideal for this, but the sample code is a bit thin. Below is a working example but I'm not sure if it's correct or being used as intended. In particular, I'm curious about reference cycles between the objects.

class Service {

  let tweets = PassthroughSubject<String, Never>()

  func start() {
    // Simulate the need send to send updates.
    DispatchQueue.global(qos: .utility).async {
      while true {
        self.sendTweet()
        usleep(100000)
      }
    }
  }

  func sendTweet() {
    tweets.send("Message \(Date().timeIntervalSince1970)")
  }
}

class Client : Subscriber {
  typealias Input = String
  typealias Failure = Never

  let service:Service
  var subscription:Subscription?

  init(service:Service) {
    self.service = service

   // Is this a retain cycle?
   // Is this thread-safe? 
    self.service.tweets.subscribe(self) 
  }

  func receive(subscription: Subscription) {
    print("Received subscription: \(subscription)")

    self.subscription = subscription
    self.subscription?.request(.unlimited)
  }

  func receive(_ input: String) -> Subscribers.Demand {
    print("Received tweet: \(input)")
    return .unlimited
  }

  func receive(completion: Subscribers.Completion<Never>) {
    print("Received completion")
  }
}

// Dependency injection is used a lot throughout the 
// application in a similar fashion to this:

let service = Service()
let client = Client(service:service)

// In the real world, the service is started when
// the application is launched and clients come-and-go.

service.start()

Output is:

Received subscription: PassthroughSubject
Received tweet: Message 1560371698.300811
Received tweet: Message 1560371698.4087949
Received tweet: Message 1560371698.578027
...

Is this even remotely close to how Combine was intended to be used?

like image 616
kennyc Avatar asked Jun 12 '19 20:06

kennyc


People also ask

Can we use combine in Swift?

The Combine framework provides a declarative Swift API for processing values over time. These values can represent many kinds of asynchronous events. Combine declares publishers to expose values that can change over time, and subscribers to receive those values from the publishers.

What is AnyPublisher in Swift?

AnyPublisher is a concrete implementation of Publisher that has no significant properties of its own, and passes through elements and completion values from its upstream publisher. Use AnyPublisher to wrap a publisher whose type has details you don't want to expose across API boundaries, such as different modules.

What is PassthroughSubject?

A PassthroughSubject broadcasts elements to downstream subscribers and provides a convenient way to adapt existing imperative code to Combine. As the name suggests, this type of subject only passes through values meaning that it does not capture any state and will drop values if there aren't any subscribers set.

What is backpressure in combine?

There may be some cases though, where processing those received values takes longer while new values arrive. In this case, we may need to control the amount of values that arrive to avoid some kind of blocking or overflow. This concept of limiting the elements the subscriber receives is called backpressure.


1 Answers

lets check it! the simplest way is add deinit to both classes and limit the live of service

class Service {
    
    let tweets = PassthroughSubject<String, Never>()
    
    func start() {
        // Simulate the need send to send updates.
        DispatchQueue.global(qos: .utility).async {
            (0 ... 3).forEach { _ in
                self.sendTweet()
                usleep(100000)
            }
        }
    }
    
    func sendTweet() {
        tweets.send("Message \(Date().timeIntervalSince1970)")
    }
    deinit {
        print("server deinit")
    }
}

now it is easy to check that

do {
    let service = Service()
    //_ = Client(service:service)
    
    // In the real world, the service is started when
    // the application is launched and clients come-and-go.
    
    service.start()
}

will finished as expected

server deinit

modify it with subscribing client

do {
    let service = Service()
    _ = Client(service:service)
    service.start()
}

and you immediately know the result

Received subscription: PassthroughSubject
Received tweet: Message 1580816649.7355099
Received tweet: Message 1580816649.8548698
Received tweet: Message 1580816650.001649
Received tweet: Message 1580816650.102639

there is a memory cycle, as you expected :-)

Generally, there is very low probability, that you need your own subscriber implementation.

First modify the service, so the client will know when no more messages will arrive

func start() {
        // Simulate the need send to send updates.
        DispatchQueue.global(qos: .utility).async {
            // send some tweets
            (0 ... 3).forEach { _ in
                self.sendTweet()
                usleep(100000)
            }
            // and send "finished"
            self.tweets.send(completion: .finished)
        }
    }

and next use "build-in" subcriber in your publisher by invoking his .sink method. .sink return AnyCancelable (it is a reference type) which you have to store somewhere.

var cancelable: AnyCancellable?
do {
    let service = Service()
    service.start()
    
    // client
    cancelable = service.tweets.sink { (s) in
        print(s)
    }
}

now, everythig works, es expected ...

Message 1580818277.2908669
Message 1580818277.4674711
Message 1580818277.641886
server deinit

But what about cancelable? Let check it!

var cancelable: AnyCancellable?
do {
    let service = Service()
    service.start()
    
    // client
    cancelable = service.tweets.sink { (s) in
        print(s)
    }
}
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
    print(cancelable)
}

it prints

Message 1580819227.5750608
Message 1580819227.763901
Message 1580819227.9366078
Message 1580819228.072041
server deinit
Optional(Combine.AnyCancellable)

so you have to release it "manualy", if you don't need it anymore. .sink is there again!

var cancelable: AnyCancellable?
do {
    let service = Service()
    service.start()
    
    // client
    cancelable = service.tweets.sink(receiveCompletion: { (completion) in
        print(completion)
        // this inform publisher to "unsubscribe" (not necessery in this scenario)
        cancelable?.cancel()
        // and we can release our canceleble
        cancelable = nil
    }, receiveValue: { (message) in
        print(message)
    })
}
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
    print(cancelable)
}

and result

Message 1580819683.462331
Message 1580819683.638145
Message 1580819683.74383
finished
server deinit
nil

Combine has almost everything you need in real word application, the trouble is a lack of documentation, but a lot of sources are available on the internet.

like image 71
user3441734 Avatar answered Oct 11 '22 13:10

user3441734