Using Apple's new Combine framework I want to make multiple requests from each element in a list. Then I want a single result from a reduction of all the the responses. Basically I want to go from list of publishers to a single publisher that holds a list of responses.
I've tried making a list of publishers, but I don't know how to reduce that list into a single publisher. And I've tried making a publisher containing a list but I can't flat map a list of publishers.
Please look at the "createIngredients" function
func createIngredient(ingredient: Ingredient) -> AnyPublisher<CreateIngredientMutation.Data, Error> { return apollo.performPub(mutation: CreateIngredientMutation(name: ingredient.name, optionalProduct: ingredient.productId, quantity: ingredient.quantity, unit: ingredient.unit)) .eraseToAnyPublisher() } func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<[CreateIngredientMutation.Data], Error> { // first attempt let results = ingredients .map(createIngredient) // results = [AnyPublisher<CreateIngredientMutation.Data, Error>] // second attempt return Publishers.Just(ingredients) .eraseToAnyPublisher() .flatMap { (list: [Ingredient]) -> Publisher<[CreateIngredientMutation.Data], Error> in return list.map(createIngredient) // [AnyPublisher<CreateIngredientMutation.Data, Error>] } }
I'm not sure how to take an array of publishers and convert that to a publisher containing an array.
Result value of type '[AnyPublisher]' does not conform to closure result type 'Publisher'
Overview. A publisher delivers elements to one or more Subscriber instances. The subscriber's Input and Failure associated types must match the Output and Failure types declared by the publisher. The publisher implements the receive(subscriber:) method to accept a subscriber.
AnyPublisher is a concrete implementation of Publisher that has no significant properties of its own, and passes through elements and completion values from its upstream publisher. Use AnyPublisher to wrap a publisher whose type has details you don't want to expose across API boundaries, such as different modules.
A PassthroughSubject broadcasts elements to downstream subscribers and provides a convenient way to adapt existing imperative code to Combine. As the name suggests, this type of subject only passes through values meaning that it does not capture any state and will drop values if there aren't any subscribers set.
Essentially, in your specific situation you're looking at something like this:
func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<[CreateIngredientMutation.Data], Error> { Publishers.MergeMany(ingredients.map(createIngredient(ingredient:))) .collect() .eraseToAnyPublisher() }
This 'collects' all the elements produced by the upstream publishers and – once they have all completed – produces an array with all the results and finally completes itself.
Bear in mind, if one of the upstream publishers fails – or produces more than one result – the number of elements may not match the number of subscribers, so you may need additional operators to mitigate this depending on your situation.
The more generic answer, with a way you can test it using the EntwineTest framework:
import XCTest import Combine import EntwineTest final class MyTests: XCTestCase { func testCreateArrayFromArrayOfPublishers() { typealias SimplePublisher = Just<Int> // we'll create our 'list of publishers' here. Each publisher emits a single // Int and then completes successfully – using the `Just` publisher. let publishers: [SimplePublisher] = [ SimplePublisher(1), SimplePublisher(2), SimplePublisher(3), ] // we'll turn our array of publishers into a single merged publisher let publisherOfPublishers = Publishers.MergeMany(publishers) // Then we `collect` all the individual publisher elements results into // a single array let finalPublisher = publisherOfPublishers.collect() // Let's test what we expect to happen, will happen. // We'll create a scheduler to run our test on let testScheduler = TestScheduler() // Then we'll start a test. Our test will subscribe to our publisher // at a virtual time of 200, and cancel the subscription at 900 let testableSubscriber = testScheduler.start { finalPublisher } // we're expecting that, immediately upon subscription, our results will // arrive. This is because we're using `just` type publishers which // dispatch their contents as soon as they're subscribed to XCTAssertEqual(testableSubscriber.recordedOutput, [ (200, .subscription), // we're expecting to subscribe at 200 (200, .input([1, 2, 3])), // then receive an array of results immediately (200, .completion(.finished)), // the `collect` operator finishes immediately after completion ]) } }
I think that Publishers.MergeMany
could be of help here. In your example, you might use it like so:
func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<CreateIngredientMutation.Data, Error> { let publishers = ingredients.map(createIngredient(ingredient:)) return Publishers.MergeMany(publishers).eraseToAnyPublisher() }
That will give you a publisher that sends you single values of the Output
.
However, if you specifically want the Output
in an array all at once at the end of all your publishers completing, you can use collect()
with MergeMany
:
func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<[CreateIngredientMutation.Data], Error> { let publishers = ingredients.map(createIngredient(ingredient:)) return Publishers.MergeMany(publishers).collect().eraseToAnyPublisher() }
And either of the above examples you could simplify into a single line if you prefer, ie:
func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<CreateIngredientMutation.Data, Error> { Publishers.MergeMany(ingredients.map(createIngredient(ingredient:))).eraseToAnyPublisher() }
You could also define your own custom merge()
extension method on Sequence
and use that to simplify the code slightly:
extension Sequence where Element: Publisher { func merge() -> Publishers.MergeMany<Element> { Publishers.MergeMany(self) } } func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<CreateIngredientMutation.Data, Error> { ingredients.map(createIngredient).merge().eraseToAnyPublisher() }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With