Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Create a Publisher that notifies subscribers one by one, waiting for eachother

Tags:

swift

combine

I have this publisher and subscribers (example code):

import Combine

let publisher = PassthroughSubject<ComplexStructOrClass, Never>()
let sub1 = publisher.sink { (someString) in
    // Async work...
}

let sub2 = publisher.sink { (someString) in
    // Async work, but it has to wait until sub1 has finished his work
}

So the publisher constant has 2 subscribers. When I use the method send on the publisher constant, it should send the value first to sub1 and after sub1 finished processing (with a callback or something like that), publisher should and notify sub2.

So in the comments its stated that Combine is made for this. What publisher do I need to use? A PassthroughSubject may be the wrong decision.

Usecase

I need to publish values throughout the lifetime of my app to a dynamic number of subscribers, for a few different publishers (I hope I can make a protocol). So a subscriber can be added and removed from a publisher at any given time. A subscriber look as follows:

  • It has a NSPersistentContainer
  • A callback should be made by the publisher when a new value has arrived. That process looks like:

    1. the publisher will create a backgroundContext of the container of the subscriber, because it knows a subscriber has a container
    2. the publisher sends the context along with the new published value to the subscriber
    3. the publisher waits until it receives a callback of the subscriber. The subscriber shouldn't save the context, but the publisher must hold a reference to the context. The subscriber gives a callback of an enum, which has a ok case and some error cases.
    4. When a subscriber gives a callback with an error enum case, the publisher must rollback the contexts it created for each subscriber.
    5. When a subscriber gives a callback with the ok case, the publisher repeats step 1 till 5 for every subscriber
    6. This step will only be reached when no subscriber gave a error enum case or there are no subscribers. The publisher will save all the contexts created by the subscribers.

Current code, no Combine This is some code without using Combine:

// My publisher
protocol NotiPublisher {

    // Type of message to send
    associatedtype Notification

    // List of subscribers for this publisher
    static var listeners: Set<AnyNotiPublisher<Notification>> { get set }
}

// My subscriber
protocol NotificationListener: Hashable {
    associatedtype NotificationType

    var container: NSPersistentContainer { get }
    // Identifier used to find this subscriber in the list of 'listeners' in the publisher
    var identifier: Int32 { get }
    var notify: ((_ notification: NotificationType, _ context: NSManagedObjectContext, @escaping CompletionHandlerAck) -> ()) { get }
}

// Type erased version of the NotificationListener and some convience methods here, can add them if desired

// In a extension of NotiPublisher, this method is here
static func notify(queue: DispatchQueue, notification: Notification, completionHander: @escaping CompletionHandlerAck) throws {
    let dispatchGroup = DispatchGroup()
    var completionBlocks = [SomeCompletionHandler]()
    var contexts = [NSManagedObjectContext]()
    var didLoop = false

    for listener in listeners {
        if didLoop {
            dispatchGroup.wait()
        } else {
            didLoop = true
        }

        dispatchGroup.enter()

        listener.container.performBackgroundTask { (context) in
            contexts.append(context)

            listener.notify(notification, context, { (completion) in                    
                completionBlocks.append(completion)

                dispatchGroup.leave()
            })
        }
    }

    dispatchGroup.notify(queue: queue) {
        let err = completion.first(where: { element in
            // Check if an error has occured
        })

        if err == nil {
            for context in contexts {
                context.performAndWait {
                    try! context.save()
                }
            }
        }

        completionHander(err ?? .ok(true))
    }
}

This is pretty complex code, I am wondering if I can make use of the power of Combine to make this code more readable.

like image 478
J. Doe Avatar asked Nov 06 '22 13:11

J. Doe


1 Answers

I wrote the following to chain async operations from a publisher using flatMap that allows you to return another publisher. I'm not a fan, and it might not meet your need to dynamically change the subs, but it might help someone:

let somePublisher = Just(12)

let anyCancellable = somePublisher.flatMap{ num in
    //We can return a publisher from flatMap, so lets return a Future one because we want to do some async work
    return Future<Int,Never>({ promise  in

        //do an async thing using dispatch
        DispatchQueue.main.asyncAfter(deadline: .now() + 3, execute: {
            print("flat map finished processing the number \(num)")

            //now just pass on the value
            promise(.success(num))
        })
    })
}.flatMap{ num in
    //We can return a publisher from flatMap, so lets return a Future one because we want to do some async work
    return Future<Int,Never>({ promise  in

        //do an async thing using dispatch
        DispatchQueue.main.asyncAfter(deadline: .now() + 3, execute: {
            print("second flat map finished processing the number \(num)")

            //now just pass on the value
            promise(.success(num))
        })
    })
}.sink { num in
    print("This sink runs after the async work in the flatMap/Future publishers")
}

like image 71
Jason Avatar answered Nov 23 '22 13:11

Jason