Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apple Combine framework: How to execute multiple Publishers in parallel and wait for all of them to finish?

Tags:

swift

combine

I am discovering Combine. I wrote methods that make HTTP requests in a "combine" way, for example:

func testRawDataTaskPublisher(for url: URL) -> AnyPublisher<Data, Error> {
    var request = URLRequest(url: url,
                             cachePolicy: .useProtocolCachePolicy,
                             timeoutInterval: 15)
    request.httpMethod = "GET"

    return urlSession.dataTaskPublisher(for: request)
        .tryMap {
            return $0.data
        }
        .eraseToAnyPublisher()
}

I would like to call the method multiple times and do a task after all, for example:

let myURLs: [URL] = ...

for url in myURLs {
    let cancellable = testRawDataTaskPublisher(for: url)
        .sink(receiveCompletion: { _ in }) { data in
            // save the data...
        }
}

The code above won't work because I have to store the cancellable in a variable that belongs to the class. The first question is: is it a good idea to store many (for example 1000) cancellables in something like Set<AnyCancellable>??? Won't it cause memory leaks?

var cancellables = Set<AnyCancellable>()

...

    let cancellable = ...

    cancellables.insert(cancellable) // ???

And the second question is: how to start a task when all the cancellables are finished? I was thinking about something like that

class Test {
    var cancellables = Set<AnyCancellable>()

    func run() {
        // show a loader

        let cancellable = runDownloads()
            .receive(on: RunLoop.main)
            .sink(receiveCompletion: { _ in }) { _ in
                // hide the loader
            }

        cancellables.insert(cancellable)
    }

    func runDownloads() -> AnyPublisher<Bool, Error> {
        let myURLs: [URL] = ...

        return Future<Bool, Error> { promise in
            let numberOfURLs = myURLS.count
            var numberOfFinishedTasks = 0

            for url in myURLs {
                let cancellable = testRawDataTaskPublisher(for: url)
                    .sink(receiveCompletion: { _ in }) { data in
                        // save the data...
                        numberOfFinishedTasks += 1

                        if numberOfFinishedTasks >= numberOfURLs {
                            promise(.success(true))
                        }
                    }

                cancellables.insert(cancellable)
            }
        }.eraseToAnyPublisher()
    }

    func testRawDataTaskPublisher(for url: URL) -> AnyPublisher<Data, Error> {
        ...
    }
}

Normally I would use DispatchGroup, start multiple HTTP tasks and consume the notification when the tasks are finished, but I am wondering how to write that in a modern way using Combine.

like image 203
kampro Avatar asked Dec 10 '19 19:12

kampro


1 Answers

You can run some operations in parallel by creating a collection of publishers, applying the flatMap operator and then collect to wait for all of the publishers to complete before continuing. Here's an example that you can run in a playground:

import Combine
import Foundation

func delayedPublisher<Value>(_ value: Value, delay after: Double) -> AnyPublisher<Value, Never> {
  let p = PassthroughSubject<Value, Never>()
  DispatchQueue.main.asyncAfter(deadline: .now() + after) {
    p.send(value)
    p.send(completion: .finished)
  }
  return p.eraseToAnyPublisher()
}

let myPublishers = [1,2,3]
  .map{ delayedPublisher($0, delay: 1 / Double($0)).print("\($0)").eraseToAnyPublisher() }

let cancel = myPublishers
  .publisher
  .flatMap { $0 }
  .collect()
  .sink { result in
    print("result:", result)
  }

Here is the output:

1: receive subscription: (PassthroughSubject)
1: request unlimited
2: receive subscription: (PassthroughSubject)
2: request unlimited
3: receive subscription: (PassthroughSubject)
3: request unlimited
3: receive value: (3)
3: receive finished
2: receive value: (2)
2: receive finished
1: receive value: (1)
1: receive finished
result: [3, 2, 1]

Notice that the publishers are all immediately started (in their original order).

The 1 / $0 delay causes the first publisher to take the longest to complete. Notice the order of the values at the end. Since the first took the longest to complete, it is the last item.

like image 97
Gil Birman Avatar answered Jan 04 '23 20:01

Gil Birman