Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

With Combine, how to deallocate the Subscription after a network request

If you use Combine for network requests with URLSession, then you need to save the Subscription (aka, the AnyCancellable) - otherwise it gets immediately deallocated, which cancels the network request. Later, when the network response has been processed, you want to deallocate the subscription, because keeping it around would be a waste of memory.

Below is some code that does this. It's kind of awkward, and it may not even be correct. I can imagine a race condition where network request could start and complete on another thread before sub is set to the non-nil value.

Is there a nicer way to do this?

class SomeThing {
    var subs = Set<AnyCancellable>()
    func sendNetworkRequest() {
        var request: URLRequest = ...
        var sub: AnyCancellable? = nil            
        sub = URLSession.shared.dataTaskPublisher(for: request)
            .map(\.data)
            .decode(type: MyResponse.self, decoder: JSONDecoder())
            .sink(
                receiveCompletion: { completion in                
                    self.subs.remove(sub!)
                }, 
                receiveValue: { response in ... }
            }
        subs.insert(sub!)
like image 922
Rob N Avatar asked May 29 '20 21:05

Rob N


Video Answer


2 Answers

I call this situation a one-shot subscriber. The idea is that, because a data task publisher publishes only once, you know for a fact that it is safe to destroy the pipeline after you receive your single value and/or completion (error).

Here's a technique I like to use. First, here's the head of the pipeline:

let url = URL(string:"https://www.apeth.com/pep/manny.jpg")!
let pub : AnyPublisher<UIImage?,Never> =
    URLSession.shared.dataTaskPublisher(for: url)
        .map {$0.data}
        .replaceError(with: Data())
        .compactMap { UIImage(data:$0) }
        .receive(on: DispatchQueue.main)
        .eraseToAnyPublisher()

Now comes the interesting part. Watch closely:

var cancellable: AnyCancellable? // 1
cancellable = pub.sink(receiveCompletion: {_ in // 2
    cancellable?.cancel() // 3
}) { image in
    self.imageView.image = image
}

Do you see what I did there? Perhaps not, so I'll explain it:

  1. First, I declare a local AnyCancellable variable; for reasons having to do with the rules of Swift syntax, this needs to be an Optional.

  2. Then, I create my subscriber and set my AnyCancellable variable to that subscriber. Again, for reasons having to do with the rules of Swift syntax, my subscriber needs to be a Sink.

  3. Finally, in the subscriber itself, I cancel the AnyCancellable when I receive the completion.

The cancellation in the third step actually does two things quite apart from calling cancel() — things having to do with memory management:

  • By referring to cancellable inside the asynchronous completion function of the Sink, I keep cancellable and the whole pipeline alive long enough for a value to arrive from the subscriber.

  • By cancelling cancellable, I permit the pipeline to go out of existence and prevent a retain cycle that would cause the surrounding view controller to leak.

like image 162
matt Avatar answered Sep 17 '22 14:09

matt


Below is some code that does this. It's kind of awkward, and it may not even be correct. I can imagine a race condition where network request could start and complete on another thread before sub is set to the non-nil value.

Danger! Swift.Set is not thread safe. If you want to access a Set from two different threads, it is up to you to serialize the accesses so they don't overlap.

What is possible in general (although not perhaps with URLSession.DataTaskPublisher) is that a publisher emits its signals synchronously, before the sink operator even returns. This is how Just, Result.Publisher, Publishers.Sequence, and others behave. So those produce the problem you're describing, without involving thread safety.

Now, how to solve the problem? If you don't think you want to actually be able to cancel the subscription, then you can avoid creating an AnyCancellable at all by using Subscribers.Sink instead of the sink operator:

        URLSession.shared.dataTaskPublisher(for: request)
            .map(\.data)
            .decode(type: MyResponse.self, decoder: JSONDecoder())
            .subscribe(Subscribers.Sink(
                receiveCompletion: { completion in ... },
                receiveValue: { response in ... }
            ))

Combine will clean up the subscription and the subscriber after the subscription completes (with either .finished or .failure).

But what if you do want to be able to cancel the subscription? Maybe sometimes your SomeThing gets destroyed before the subscription is complete, and you don't need the subscription to complete in that case. Then you do want to create an AnyCancellable and store it in an instance property, so that it gets cancelled when SomeThing is destroyed.

In that case, set a flag indicating that the sink won the race, and check the flag before storing the AnyCancellable.

        var sub: AnyCancellable? = nil
        var isComplete = false
        sub = URLSession.shared.dataTaskPublisher(for: request)
            .map(\.data)
            .decode(type: MyResponse.self, decoder: JSONDecoder())
            // This ensures thread safety, if the subscription is also created
            // on DispatchQueue.main.
            .receive(on: DispatchQueue.main)
            .sink(
                receiveCompletion: { [weak self] completion in
                    isComplete = true
                    if let theSub = sub {
                        self?.subs.remove(theSub)
                    }
                }, 
                receiveValue: { response in ... }
            }
        if !isComplete {
            subs.insert(sub!)
        }
like image 29
rob mayoff Avatar answered Sep 19 '22 14:09

rob mayoff