Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to adjust Combine's Publisher demand without a custom Subscriber?

In Combine framework there's a concept of a demand, which allows signalling backpressure to publishers.

Suppose I have a simple publisher:

let numbers = Publishers.Sequence<ClosedRange<Int>, Error>(sequence: 0...100)

I would like to download certain URLs that use these numbers as arguments. I also would like a next download to start only after a previous download has finished.

A naive approach then would look like this:

let subscription = numbers.sink(receiveCompletion: { _ in }, receiveValue: {
  let url = URL(string: "https://httpbin.org/get?value=\($0)")!
  URLSession.shared.dataTask(with: url) { 
    $0.map { print(String(data: $0, encoding: .utf8)!) }
  }.resume()
})

Unfortunately, this wouldn't satisfy the requirement of waiting for a previous download to complete before starting the next one. As far as I know, sink function would return a value of type AnyCancellable, not of type Subscription. If the latter was the case, we could call the request function on the subscription with a specific demand after an upload completes.

What would be the best way to control demand of a subscription provided by sink or any other standard Combine Subscriber?

like image 711
Max Desiatov Avatar asked Mar 03 '23 03:03

Max Desiatov


1 Answers

Turns out, flatMap operator takes an additional maxPublishers argument that takes a Subscribers.Demand value. In combination with the Future publisher, this allows the numbers publisher to wait until the future is able to process a given value before sending a next one.

Applying this to the original code, downloading values one after another would look like this:

enum DownloadError: Error {
    case noData
}

let subscription = numbers.flatMap(maxPublishers: .max(1)) { number in
    Future { promise in
        let url = URL(string: "https://httpbin.org/get?value=\(number)")!
        URLSession.shared.dataTask(with: url) {
            switch ($0, $2) {
            case let (data?, nil):
                promise(.success(data))
            case let (nil, error?):
                promise(.failure(error))
            default:
                promise(.failure(DownloadError.noData))
            }
        }.resume()
    }
}.sink(
    receiveCompletion: { _ in print("errors should be handled here") },
    receiveValue: { print(String(data: $0, encoding: .utf8)!) }
)
like image 77
Max Desiatov Avatar answered Apr 30 '23 05:04

Max Desiatov