Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Combine framework: how to process each element of array asynchronously before proceeding

Tags:

ios

swift

combine

I'm having a bit of a mental block using the iOS Combine framework.

I'm converting some code from "manual" fetching from a remote API to using Combine. Basically, the API is SQL and REST (in actual fact it's Salesforce, but that's irrelevant to the question). What the code used to do is call a REST query method that takes a completion handler. What I'm doing is replacing this everywhere with a Combine Future. So far, so good.

The problem arises when the following scenario happens (and it happens a lot):

  1. We do a REST query and get back an array of "objects".

  2. But these "objects" are not completely populated. Each one of them needs additional data from some related object. So for each "object", we do another REST query using information from that "object", thus giving us another array of "objects".

  3. This might or might not allow us to finish populating the first "objects" — or else, we might have to do another REST query using information from each of the second "object", and so on.

The result was a lot of code structured like this (this is pseudocode):

func fetchObjects(completion: @escaping ([Object] -> Void) {
    let restQuery = ...
    RESTClient.performQuery(restQuery) { results in
        let partialObjects = results.map { ... }
        let group = DispatchGroup()
        for partialObject in partialObjects {
            let restQuery = ... // something based on partialObject
            group.enter()
            RESTClient.performQuery(restQuery) { results in
                group.leave()
                let partialObjects2 = results.map { ... }
                partialObject.property1 = // something from partialObjects2
                partialObject.property2 = // something from partialObjects2
                // and we could go down yet _another_ level in some cases
            }
        }
        group.notify {
            completion([partialObjects])
        }
    }
}

Every time I say results in in the pseudocode, that's the completion handler of an asynchronous networking call.

Okay, well, I see well enough how to chain asynchronous calls in Combine, for example by using Futures and flatMap (pseudocode again):

let future1 = Future...
future1.map {
    // do something
}.flatMap {
    let future2 = Future...
    return future2.map {
        // do something
    }
}
// ...

In that code, the way we form future2 can depend upon the value we received from the execution of future1, and in the map on future2 we can modify what we received from upstream before it gets passed on down the pipeline. No problem. It's all quite beautiful.

But that doesn't give me what I was doing in the pre-Combine code, namely the loop. Here I was, doing multiple asynchronous calls in a loop, held in place by a DispatchGroup before proceeding. The question is:

What is the Combine pattern for doing that?

Remember the situation. I've got an array of some object. I want to loop through that array, doing an asynchronous call for each object in the loop, fetching new info asynchronously and modifying that object on that basis, before proceeding on down the pipeline. And each loop might involve a further nested loop gathering even more information asynchronously:

Fetch info from online database, it's an array
   |
   V
For each element in the array, fetch _more_ info, _that's_ an array
   |
   V
For each element in _that_ array, fetch _more_ info
   |
   V
Loop thru the accumulated info and populate that element of the original array 

The old code for doing this was horrible-looking, full of nested completion handlers and loops held in place by DispatchGroup enter/leave/notify. But it worked. I can't get my Combine code to work the same way. How do I do it? Basically my pipeline output is an array of something, I feel like I need to split up that array into individual elements, do something asynchronously to each element, and put the elements back together into an array. How?


The way I've been solving this works, but doesn't scale, especially when an asynchronous call needs information that arrived several steps back in the pipeline chain. I've been doing something like this (I got this idea from https://stackoverflow.com/a/58708381/341994):

  1. An array of objects arrives from upstream.

  2. I enter a flatMap and map the array to an array of publishers, each headed by a Future that fetches further online stuff related to one object, and followed by a pipeline that produces the modified object.

  3. Now I have an array of pipelines, each producing a single object. I merge that array and produce that publisher (a MergeMany) from the flatMap.

  4. I collect the resulting values back into an array.

But this still seems like a lot of work, and even worse, it doesn't scale when each sub-pipeline itself needs to spawn an array of sub-pipelines. It all becomes incomprehensible, and information that used to arrive easily into a completion block (because of Swift's scoping rules) no longer arrives into a subsequent step in the main pipeline (or arrives only with difficulty because I pass bigger and bigger tuples down the pipeline).

There must be some simple Combine pattern for doing this, but I'm completely missing it. Please tell me what it is.

like image 924
matt Avatar asked May 16 '20 17:05

matt


2 Answers

With your latest edit and this comment below:

I literally am asking is there a Combine equivalent of "don't proceed to the next step until this step, involving multiple asynchronous steps, has finished"

I think this pattern can be achieved with .flatMap to an array publisher (Publishers.Sequence), which emits one-by-one and completes, followed by whatever per-element async processing is needed, and finalized with a .collect, which waits for all elements to complete before proceeding

So, in code, assuming we have these functions:

func getFoos() -> AnyPublisher<[Foo], Error>
func getPartials(for: Foo) -> AnyPublisher<[Partial], Error>
func getMoreInfo(for: Partial, of: Foo) -> AnyPublisher<MoreInfo, Error>

We can do the following:

getFoos()
.flatMap { fooArr in 
    fooArr.publisher.setFailureType(to: Error.self)
 }

// per-foo element async processing
.flatMap { foo in

  getPartials(for: foo)
    .flatMap { partialArr in
       partialArr.publisher.setFailureType(to: Error.self)
     }

     // per-partial of foo async processing
    .flatMap { partial in

       getMoreInfo(for: partial, of: foo)
         // build completed partial with more info
         .map { moreInfo in
            var newPartial = partial
            newPartial.moreInfo = moreInfo
            return newPartial
         }
     }
     .collect()
     // build completed foo with all partials
     .map { partialArr in
        var newFoo = foo
        newFoo.partials = partialArr
        return newFoo
     }
}
.collect()

(Deleted the old answer)

like image 157
New Dev Avatar answered Nov 19 '22 15:11

New Dev


Using the accepted answer, I wound up with this structure:

head // [Entity]
    .flatMap { entities -> AnyPublisher<Entity, Error> in
        Publishers.Sequence(sequence: entities).eraseToAnyPublisher()
    }.flatMap { entity -> AnyPublisher<Entity, Error> in
        self.makeFuture(for: entity) // [Derivative]
            .flatMap { derivatives -> AnyPublisher<Derivative, Error> in
                Publishers.Sequence(sequence: derivatives).eraseToAnyPublisher()
            }
            .flatMap { derivative -> AnyPublisher<Derivative2, Error> in
                self.makeFuture(for: derivative).eraseToAnyPublisher() // Derivative2
        }.collect().map { derivative2s -> Entity in
            self.configuredEntity(entity, from: derivative2s)
        }.eraseToAnyPublisher()
    }.collect()

That has exactly the elegant tightness I was looking for! So the idea is:

We receive an array of something, and we need to process each element asynchronously. The old way would have been a DispatchGroup and a for...in loop. The Combine equivalent is:

  • The equivalent of the for...in line is flatMap and Publishers.Sequence.

  • The equivalent of the DispatchGroup (dealing with asynchronousness) is a further flatMap (on the individual element) and some publisher. In my case I start with a Future based on the individual element we just received.

  • The equivalent of the right curly brace at the end is collect(), waiting for all elements to be processed and putting the array back together again.

So to sum up, the pattern is:

  1. flatMap the array to a Sequence.
  2. flatMap the individual element to a publisher that launches the asynchronous operation on that element.
  3. Continue the chain from that publisher as needed.
  4. collect back into an array.

By nesting that pattern, we can take advantage of Swift scoping rules to keep the thing we need to process in scope until we have acquired enough information to produce the processed object.

like image 6
matt Avatar answered Nov 19 '22 16:11

matt