I'm using Swift Combine for my API requests. Now I'm facing a situation where I want to have more than 4 parallel requests that I want to zip together. Before I had exactly 4 requests that I zipped together using Zip4() operator. I can imagine that you do the zipping in multiple steps but I don't know how to write the receiveValue for it.
Here's a simplification of my current code with 4 parallel requests:
Publishers.Zip4(request1, request2, request3, request4)
.sink(receiveCompletion: { completion in
// completion code if all 4 requests completed
}, receiveValue: { request1Response, request2Response, request3Response, request4Response in
// do something with request1Response
// do something with request2Response
// do something with request3Response
// do something with request4Response
}
)
.store(in: &state.subscriptions)
CombineLatest publisher collects the first value from all three publishers and emits them as a single tuple. CombineLatest continues sending new values even when only one publisher emits a new value. On the other hand, the Zip operator sends a new value only when all the publishers emit new values.
Overview. The Combine framework provides a declarative Swift API for processing values over time. These values can represent many kinds of asynchronous events. Combine declares publishers to expose values that can change over time, and subscribers to receive those values from the publishers.
The thing that stops you from zipping an arbitrary number of publishers is the very unfortunate fact that Apple has elected to make the output of the zip operators be a tuple. Tuples are extremely inflexible and limited in their power. You can’t have a tuple of, say, ten elements; and you can’t even append an element to a tuple, because that causes you to get a different type. What we need, therefore, is a new operator that does the same job as zip
but emits some more powerful and flexible result, such as an array.
And we can make one! Luckily, the zip
operator itself has a transform
parameter that lets us specify what sort of output we want.
Okay, so, to illustrate, I'll zip ten publishers together. First, I'll make an array of ten publishers; they will be mere Just publishers, but that's sufficient to make the point, and to prove that I'm not cheating I'll attach an arbitrary delay to each of them:
let justs = (1...10).map {
Just($0)
.delay(for: .seconds(Int.random(in:1...3)), scheduler: DispatchQueue.main)
.eraseToAnyPublisher() }
Okay, now I've got an array of publishers, and I'll zip them together in a loop:
let result = justs.dropFirst().reduce(into: AnyPublisher(justs[0].map{[$0]})) {
res, just in
res = res.zip(just) {
i1, i2 -> [Int] in
return i1 + [i2]
}.eraseToAnyPublisher()
}
Note the trailing closure after the zip
operator! This ensures that my output will be an Array<Int>
instead of a tuple. Unlike a tuple, I'm allowed to make an array of any size, just adding elements each time thru the loop.
Okay, so result
is now a Zip publisher that zips together ten publishers. To prove it, I'll just attach a subscriber to it and print the output:
result.sink {print($0)}.store(in: &self.storage)
We run the code. There is a heart-stopping pause — rightly, because each of those Just publishers has a different random delay, and the rule of zip is that they all need to publish before we get any output. They all do, sooner or later, and the output appears in the console:
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Exactly the right answer! I've proved that I did in fact zip ten publishers together to produce output consisting of a single contribution from each of them.
Zipping together an arbitrary number of data task publishers (or whatever you're using) is no different.
(For a related question, where I learn how to serialize an arbitrary number of data task publishers, see Combine framework serialize async operations.)
Based on Matt's answer:
extension Publishers {
struct ZipMany<Element, F: Error>: Publisher {
typealias Output = [Element]
typealias Failure = F
private let upstreams: [AnyPublisher<Element, F>]
init(_ upstreams: [AnyPublisher<Element, F>]) {
self.upstreams = upstreams
}
func receive<S: Subscriber>(subscriber: S) where Self.Failure == S.Failure, Self.Output == S.Input {
let initial = Just<[Element]>([])
.setFailureType(to: F.self)
.eraseToAnyPublisher()
let zipped = upstreams.reduce(into: initial) { result, upstream in
result = result.zip(upstream) { elements, element in
elements + [element]
}
.eraseToAnyPublisher()
}
zipped.subscribe(subscriber)
}
}
}
A unit test can use the following as input:
let upstreams: [AnyPublisher<String, Never>] = [
Just("first")
.receive(on: DispatchQueue.main)
.eraseToAnyPublisher(),
Just("second").eraseToAnyPublisher()
]
The .receive(on:)
puts that event's emission on the end of the main queue so that it will emit after "second"
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With