I am discovering Combine. I wrote methods that make HTTP requests in a "combine" way, for example:
func testRawDataTaskPublisher(for url: URL) -> AnyPublisher<Data, Error> {
var request = URLRequest(url: url,
cachePolicy: .useProtocolCachePolicy,
timeoutInterval: 15)
request.httpMethod = "GET"
return urlSession.dataTaskPublisher(for: request)
.tryMap {
return $0.data
}
.eraseToAnyPublisher()
}
I would like to call the method multiple times and do a task after all, for example:
let myURLs: [URL] = ...
for url in myURLs {
let cancellable = testRawDataTaskPublisher(for: url)
.sink(receiveCompletion: { _ in }) { data in
// save the data...
}
}
The code above won't work because I have to store the cancellable in a variable that belongs to the class.
The first question is: is it a good idea to store many (for example 1000) cancellables in something like Set<AnyCancellable>
??? Won't it cause memory leaks?
var cancellables = Set<AnyCancellable>()
...
let cancellable = ...
cancellables.insert(cancellable) // ???
And the second question is: how to start a task when all the cancellables are finished? I was thinking about something like that
class Test {
var cancellables = Set<AnyCancellable>()
func run() {
// show a loader
let cancellable = runDownloads()
.receive(on: RunLoop.main)
.sink(receiveCompletion: { _ in }) { _ in
// hide the loader
}
cancellables.insert(cancellable)
}
func runDownloads() -> AnyPublisher<Bool, Error> {
let myURLs: [URL] = ...
return Future<Bool, Error> { promise in
let numberOfURLs = myURLS.count
var numberOfFinishedTasks = 0
for url in myURLs {
let cancellable = testRawDataTaskPublisher(for: url)
.sink(receiveCompletion: { _ in }) { data in
// save the data...
numberOfFinishedTasks += 1
if numberOfFinishedTasks >= numberOfURLs {
promise(.success(true))
}
}
cancellables.insert(cancellable)
}
}.eraseToAnyPublisher()
}
func testRawDataTaskPublisher(for url: URL) -> AnyPublisher<Data, Error> {
...
}
}
Normally I would use DispatchGroup
, start multiple HTTP tasks and consume the notification when the tasks are finished, but I am wondering how to write that in a modern way using Combine.
You can run some operations in parallel by creating a collection of publishers, applying the flatMap
operator and then collect
to wait for all of the publishers to complete before continuing. Here's an example that you can run in a playground:
import Combine
import Foundation
func delayedPublisher<Value>(_ value: Value, delay after: Double) -> AnyPublisher<Value, Never> {
let p = PassthroughSubject<Value, Never>()
DispatchQueue.main.asyncAfter(deadline: .now() + after) {
p.send(value)
p.send(completion: .finished)
}
return p.eraseToAnyPublisher()
}
let myPublishers = [1,2,3]
.map{ delayedPublisher($0, delay: 1 / Double($0)).print("\($0)").eraseToAnyPublisher() }
let cancel = myPublishers
.publisher
.flatMap { $0 }
.collect()
.sink { result in
print("result:", result)
}
Here is the output:
1: receive subscription: (PassthroughSubject)
1: request unlimited
2: receive subscription: (PassthroughSubject)
2: request unlimited
3: receive subscription: (PassthroughSubject)
3: request unlimited
3: receive value: (3)
3: receive finished
2: receive value: (2)
2: receive finished
1: receive value: (1)
1: receive finished
result: [3, 2, 1]
Notice that the publishers are all immediately started (in their original order).
The 1 / $0
delay causes the first publisher to take the longest to complete. Notice the order of the values at the end. Since the first took the longest to complete, it is the last item.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With