Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to zip more than 4 publishers

Tags:

swift

combine

I'm using Swift Combine for my API requests. Now I'm facing a situation where I want to have more than 4 parallel requests that I want to zip together. Before I had exactly 4 requests that I zipped together using Zip4() operator. I can imagine that you do the zipping in multiple steps but I don't know how to write the receiveValue for it.

Here's a simplification of my current code with 4 parallel requests:

    Publishers.Zip4(request1, request2, request3, request4)
        .sink(receiveCompletion: { completion in
            // completion code if all 4 requests completed
        }, receiveValue: { request1Response, request2Response, request3Response, request4Response in
            // do something with request1Response
            // do something with request2Response
            // do something with request3Response
            // do something with request4Response
        }
    )
        .store(in: &state.subscriptions)
like image 304
G. Marc Avatar asked Feb 21 '20 20:02

G. Marc


People also ask

How do I combine two publishers?

CombineLatest publisher collects the first value from all three publishers and emits them as a single tuple. CombineLatest continues sending new values even when only one publisher emits a new value. On the other hand, the Zip operator sends a new value only when all the publishers emit new values.

What is combine Swift?

Overview. The Combine framework provides a declarative Swift API for processing values over time. These values can represent many kinds of asynchronous events. Combine declares publishers to expose values that can change over time, and subscribers to receive those values from the publishers.


2 Answers

The thing that stops you from zipping an arbitrary number of publishers is the very unfortunate fact that Apple has elected to make the output of the zip operators be a tuple. Tuples are extremely inflexible and limited in their power. You can’t have a tuple of, say, ten elements; and you can’t even append an element to a tuple, because that causes you to get a different type. What we need, therefore, is a new operator that does the same job as zip but emits some more powerful and flexible result, such as an array.

And we can make one! Luckily, the zip operator itself has a transform parameter that lets us specify what sort of output we want.

Okay, so, to illustrate, I'll zip ten publishers together. First, I'll make an array of ten publishers; they will be mere Just publishers, but that's sufficient to make the point, and to prove that I'm not cheating I'll attach an arbitrary delay to each of them:

let justs = (1...10).map {
    Just($0)
        .delay(for: .seconds(Int.random(in:1...3)), scheduler: DispatchQueue.main)
        .eraseToAnyPublisher() }

Okay, now I've got an array of publishers, and I'll zip them together in a loop:

let result = justs.dropFirst().reduce(into: AnyPublisher(justs[0].map{[$0]})) { 
    res, just in
    res = res.zip(just) {
        i1, i2 -> [Int] in
        return i1 + [i2]
    }.eraseToAnyPublisher()
}

Note the trailing closure after the zip operator! This ensures that my output will be an Array<Int> instead of a tuple. Unlike a tuple, I'm allowed to make an array of any size, just adding elements each time thru the loop.

Okay, so result is now a Zip publisher that zips together ten publishers. To prove it, I'll just attach a subscriber to it and print the output:

result.sink {print($0)}.store(in: &self.storage)

We run the code. There is a heart-stopping pause — rightly, because each of those Just publishers has a different random delay, and the rule of zip is that they all need to publish before we get any output. They all do, sooner or later, and the output appears in the console:

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Exactly the right answer! I've proved that I did in fact zip ten publishers together to produce output consisting of a single contribution from each of them.

Zipping together an arbitrary number of data task publishers (or whatever you're using) is no different.

(For a related question, where I learn how to serialize an arbitrary number of data task publishers, see Combine framework serialize async operations.)

like image 187
matt Avatar answered Sep 30 '22 10:09

matt


Based on Matt's answer:

extension Publishers {
    struct ZipMany<Element, F: Error>: Publisher {
        typealias Output = [Element]
        typealias Failure = F

        private let upstreams: [AnyPublisher<Element, F>]

        init(_ upstreams: [AnyPublisher<Element, F>]) {
            self.upstreams = upstreams
        }

        func receive<S: Subscriber>(subscriber: S) where Self.Failure == S.Failure, Self.Output == S.Input {
            let initial = Just<[Element]>([])
                .setFailureType(to: F.self)
                .eraseToAnyPublisher()

            let zipped = upstreams.reduce(into: initial) { result, upstream in
                result = result.zip(upstream) { elements, element in
                    elements + [element]
                }
                .eraseToAnyPublisher()
            }

            zipped.subscribe(subscriber)
        }
    }
}

A unit test can use the following as input:

let upstreams: [AnyPublisher<String, Never>] = [
    Just("first")
        .receive(on: DispatchQueue.main)
        .eraseToAnyPublisher(),
    Just("second").eraseToAnyPublisher()
]

The .receive(on:) puts that event's emission on the end of the main queue so that it will emit after "second".

like image 36
abc123 Avatar answered Sep 30 '22 08:09

abc123