Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to process an array of task asynchronously with swift combine

Tags:

swift

combine

I have a publisher which takes a network call and returns an array of IDs. I now need to call another network call for each ID to get all my data. And I want the final publisher to have the resulting object.

First network result:

"user": {
   "id": 0,
   "items": [1, 2, 3, 4, 5]
}

Final object:

struct User {
    let id: Int
    let items: [Item]
    ... other fields ...
}
struct Item {
    let id: Int
    ... other fields ...
}

Handling multiple network calls:

userPublisher.flatMap { user in
    let itemIDs = user.items
    return Future<[Item], Never>() { fulfill in
        ... OperationQueue of network requests ...
    }
}

I would like to perform the network requests in parallel, since they are not dependent on each other. I'm not sure if Future is right here, but I'd imagine I would then have code to do a DispatchGroup or OperationQueue and fulfill when they're all done. Is there more of a Combine way of doing this?

Doe Combine have a concept of splitting one stream into many parallel streams and joining the streams together?

like image 235
Michael Ozeryansky Avatar asked Dec 23 '22 20:12

Michael Ozeryansky


1 Answers

Combine offers extensions around URLSession to handle network requests unless you really need to integrate with OperationQueue based networking, then Future is a fine candidate. You can run multiple Futures and collect them at some point, but I'd really suggest looking at URLSession extensions for Combine.

struct User: Codable {
   var username: String
}

let requestURL = URL(string: "https://example.com/")!
let publisher = URLSession.shared.dataTaskPublisher(for: requestURL)
    .map { $0.data }
    .decode(type: User.self, decoder: JSONDecoder())

Regarding running a batch of requests, it's possible to use Publishers.MergeMany, i.e:

struct User: Codable {
   var username: String
}

let userIds = [1, 2, 3]

let subscriber = Just(userIds)
    .setFailureType(to: Error.self)
    .flatMap { (values) -> Publishers.MergeMany<AnyPublisher<User, Error>> in
    let tasks = values.map { (userId) -> AnyPublisher<User, Error> in
            let requestURL = URL(string: "https://jsonplaceholder.typicode.com/users/\(userId)")!

            return URLSession.shared.dataTaskPublisher(for: requestURL)
                .map { $0.data }
                .decode(type: User.self, decoder: JSONDecoder())
                .eraseToAnyPublisher()
    }
    return Publishers.MergeMany(tasks)
}.collect().sink(receiveCompletion: { (completion) in
    if case .failure(let error) = completion {
        print("Got error: \(error.localizedDescription)")
    }
}) { (allUsers) in
    print("Got users:")
    allUsers.map { print("\($0)") }
}

In the example above I use collect to collect all results, which postpones emitting the value to the Sink until all of the network requests successfully finished, however you can get rid of the collect and receive each User in the example above one by one as network requests complete.

like image 141
Rob Zombie Avatar answered May 20 '23 05:05

Rob Zombie